--------------------------------------------------------------------------------
-- ai 0.12.0 (x-release-please-version)


set local search_path = pg_catalog, pg_temp;

/*
make sure that the user doing the install/upgrade is the same user who owns the
migration table. abort the upgrade if different.
*/

CREATE SCHEMA IF NOT EXISTS ai;


do $bootstrap_pgai_lib$
declare
    _current_user_id oid = null;
    _migration_table_owner_id oid = null;
    _database_owner_id oid = null;
begin
    select pg_catalog.to_regrole(current_user)::oid
    into strict _current_user_id;

    select k.relowner into _migration_table_owner_id
    from pg_catalog.pg_class k
    inner join pg_catalog.pg_namespace n on (k.relnamespace = n.oid)
    where k.relname operator(pg_catalog.=) 'pgai_lib_migration'
    and n.nspname operator(pg_catalog.=) 'ai';

    if _migration_table_owner_id is not null
    and _migration_table_owner_id is distinct from _current_user_id then
    
        if _migration_table_owner_id = to_regrole('pg_database_owner') then
            select d.datdba into strict _database_owner_id
            from pg_catalog.pg_database d
            where d.datname = current_database();

            if _database_owner_id is distinct from _current_user_id then
                raise exception 'only the owner of the ai.pgai_lib_migration table can run database migrations';
                return;
            end if;
        else
            raise exception 'only the owner of the ai.pgai_lib_migration table can run database migrations';
            return;
        end if;
    end if;

    if _migration_table_owner_id is null then
        create table ai.pgai_lib_migration
        ( "name" text not null primary key
        , applied_at_version text not null
        , applied_at timestamptz not null default pg_catalog.clock_timestamp()
        , body text not null
        );
    end if;
end;
$bootstrap_pgai_lib$;

--make sure there is only one install at a time
LOCK TABLE ai.pgai_lib_migration;

-- records any feature flags that were enabled when installing
-- a prerelease version of the extension
create table if not exists ai.pgai_lib_feature_flag
( "name" text not null primary key
, applied_at_version text not null
, applied_at timestamptz not null default pg_catalog.clock_timestamp()
);

create table if not exists ai.pgai_lib_version
( "name" text not null primary key
, version text not null
, installed_at timestamptz not null default pg_catalog.clock_timestamp()
);

--check if the app has already been installed, error if so
do $$
declare
    _pgai_lib_version text;
begin
    select version from ai.pgai_lib_version where name operator(pg_catalog.=) 'ai' into _pgai_lib_version;
    
    if _pgai_lib_version is not null and _pgai_lib_version = '__version__' then
        raise exception 'the pgai library has already been installed/upgraded' using errcode = '42710';
    end if;
end;
$$;

insert into ai.pgai_lib_version ("name", version)
values ('ai', '__version__') on conflict ("name") do update set version = excluded.version;


-------------------------------------------------------------------------------
-- 001-vectorizer.sql
do $outer_migration_block$ /*001-vectorizer.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$001-vectorizer.sql$migration_name$;
    _migration_body text =
$migration_body$

create table ai.vectorizer
( id int not null primary key generated by default as identity
, source_schema name not null
, source_table name not null
, source_pk jsonb not null
, target_schema name not null
, target_table name not null
, view_schema name not null
, view_name name not null
, trigger_name name not null
, queue_schema name
, queue_table name
, config jsonb not null
, unique (target_schema, target_table)
);

create table ai.vectorizer_errors
( id int not null references ai.vectorizer (id) on delete cascade
, message text
, details jsonb
, recorded timestamptz not null default now()
);
create index on ai.vectorizer_errors (id, recorded);

$migration_body$;
begin
    select * into _migration from ai.pgai_lib_migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.pgai_lib_migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$__version__$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 003-vec-storage.sql
do $outer_migration_block$ /*003-vec-storage.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$003-vec-storage.sql$migration_name$;
    _migration_body text =
$migration_body$

-- switch the vector columns from storage external to storage main
do language plpgsql $block$
declare
    _sql pg_catalog.text;
begin
    for _sql in
    (
        select pg_catalog.format
        ( $sql$alter table %I.%I alter column embedding set storage main$sql$
        , v.target_schema
        , v.target_table
        )
        from ai.vectorizer v
        inner join pg_catalog.pg_class k on (k.relname operator(pg_catalog.=) v.target_table)
        inner join pg_catalog.pg_namespace n
            on (k.relnamespace operator(pg_catalog.=) n.oid and n.nspname operator(pg_catalog.=) v.target_schema)
        inner join pg_catalog.pg_attribute a on (k.oid operator(pg_catalog.=) a.attrelid)
        where a.attname operator(pg_catalog.=) 'embedding'
        and a.attstorage not in ('m', 'p') -- not main or plain
    )
    loop
        raise info '%', _sql;
        execute _sql;
    end loop;
end;
$block$;

$migration_body$;
begin
    select * into _migration from ai.pgai_lib_migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.pgai_lib_migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$__version__$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 005-vectorizer-queue-pending.sql
do $outer_migration_block$ /*005-vectorizer-queue-pending.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$005-vectorizer-queue-pending.sql$migration_name$;
    _migration_body text =
$migration_body$

-- we added a new parameter which changes the signature producing a new function
-- drop the old function if it exists from a prior extension version
-- we cascade drop because the ai.vectorizer_status view depends on this function
-- we'll immediate recreate the view, so we should be good
drop function if exists ai.vectorizer_queue_pending(int) cascade;

$migration_body$;
begin
    select * into _migration from ai.pgai_lib_migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.pgai_lib_migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$__version__$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 006-drop-vectorizer.sql
do $outer_migration_block$ /*006-drop-vectorizer.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$006-drop-vectorizer.sql$migration_name$;
    _migration_body text =
$migration_body$
drop function if exists ai.drop_vectorizer(int) cascade;

$migration_body$;
begin
    select * into _migration from ai.pgai_lib_migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.pgai_lib_migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$__version__$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 012-add-vectorizer-disabled-column.sql
do $outer_migration_block$ /*012-add-vectorizer-disabled-column.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$012-add-vectorizer-disabled-column.sql$migration_name$;
    _migration_body text =
$migration_body$
alter table ai.vectorizer add column disabled boolean not null default false;

$migration_body$;
begin
    select * into _migration from ai.pgai_lib_migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.pgai_lib_migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$__version__$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 017-upgrade-source-pk.sql
do $outer_migration_block$ /*017-upgrade-source-pk.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$017-upgrade-source-pk.sql$migration_name$;
    _migration_body text =
$migration_body$

do language plpgsql $block$
declare
    _vec ai.vectorizer;
    _source pg_catalog.oid;
    _source_pk pg_catalog.jsonb;
begin
    for _vec in (select * from ai.vectorizer)
    loop
        _source = pg_catalog.to_regclass(pg_catalog.format('%I.%I', _vec.source_schema, _vec.source_table));
        if _source is null then
            continue;
        end if;
        
        select pg_catalog.jsonb_agg(x) into _source_pk
        from
        (
            select e.attnum, e.pknum, a.attname, pg_catalog.format_type(y.oid, a.atttypmod) as typname
            from pg_catalog.pg_constraint k
            cross join lateral pg_catalog.unnest(k.conkey) with ordinality e(attnum, pknum)
            inner join pg_catalog.pg_attribute a
                on (k.conrelid operator(pg_catalog.=) a.attrelid
                    and e.attnum operator(pg_catalog.=) a.attnum)
            inner join pg_catalog.pg_type y on (a.atttypid operator(pg_catalog.=) y.oid)
            where k.conrelid operator(pg_catalog.=) _source
            and k.contype operator(pg_catalog.=) 'p'
        ) x;
        
        if _source_pk is null then
            continue;
        end if;
        
        update ai.vectorizer u set source_pk = _source_pk
        where u.id = _vec.id
        ;
    end loop;
end;
$block$;

$migration_body$;
begin
    select * into _migration from ai.pgai_lib_migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.pgai_lib_migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$__version__$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 018-drop-foreign-key-constraint.sql
do $outer_migration_block$ /*018-drop-foreign-key-constraint.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$018-drop-foreign-key-constraint.sql$migration_name$;
    _migration_body text =
$migration_body$
do language plpgsql $block$
DECLARE
    _vectorizer RECORD;
    _constraint_name text;
    _sql text;
BEGIN
    -- Loop through all vectorizers
    FOR _vectorizer IN 
        SELECT 
            v.id,
            v.target_schema,
            v.target_table,
            v.source_schema,
            v.source_table
        FROM ai.vectorizer v
    LOOP
        -- Find the foreign key constraint for this vectorizer's store table
        SELECT conname INTO _constraint_name
        FROM pg_constraint c
        JOIN pg_class t ON c.conrelid = t.oid
        JOIN pg_namespace n ON t.relnamespace = n.oid
        JOIN pg_class t2 ON c.confrelid = t2.oid
        JOIN pg_namespace n2 ON t2.relnamespace = n2.oid
        WHERE n.nspname = _vectorizer.target_schema
        AND t.relname = _vectorizer.target_table
        AND n2.nspname = _vectorizer.source_schema
        AND t2.relname = _vectorizer.source_table
        AND c.contype = 'f';

        IF _constraint_name IS NOT NULL THEN
            -- Build and execute the ALTER TABLE command to drop the constraint
            _sql := format(
                'ALTER TABLE %I.%I DROP CONSTRAINT %I',
                _vectorizer.target_schema,
                _vectorizer.target_table,
                _constraint_name
            );
            
            RAISE NOTICE 'Dropping foreign key constraint % from %.%', 
                _constraint_name, 
                _vectorizer.target_schema, 
                _vectorizer.target_table;
            
            EXECUTE _sql;
        ELSE
            RAISE NOTICE 'No foreign key constraint found for %.%', 
                _vectorizer.target_schema, 
                _vectorizer.target_table;
        END IF;
    END LOOP;
END;
$block$;

-- dropping in favour of new signatures
drop function if exists ai._vectorizer_create_source_trigger(name,name,name,name,name,jsonb);
drop function if exists ai._vectorizer_create_target_table(name,name,jsonb,name,name,integer,name[]);
$migration_body$;
begin
    select * into _migration from ai.pgai_lib_migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.pgai_lib_migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$__version__$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 019-drop-truncate-from-vectorizer-config-lib.sql
do $outer_migration_block$ /*019-drop-truncate-from-vectorizer-config-lib.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$019-drop-truncate-from-vectorizer-config-lib.sql$migration_name$;
    _migration_body text =
$migration_body$
-- in the extension, this was done in 009-drop-truncate-from-vectorizer-config.sql
-- but that has a mix of extension and vectorizer config changes.
-- so we need to split it out. but put it at the beginning of the lib changes.
-- since it's idempotent and no changes from 009-018 depend on it, the change in order is OK.
UPDATE ai.vectorizer SET config = config #- '{"embedding", "truncate"}' WHERE config @? '$.embedding.truncate';

$migration_body$;
begin
    select * into _migration from ai.pgai_lib_migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.pgai_lib_migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$__version__$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 020-add-worker-tracking-table.sql
do $outer_migration_block$ /*020-add-worker-tracking-table.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$020-add-worker-tracking-table.sql$migration_name$;
    _migration_body text =
$migration_body$
CREATE TABLE ai.vectorizer_worker_process(
        id uuid not null primary key default gen_random_uuid()
    ,   version text not null
    ,   started timestamptz not null default now()
    ,   expected_heartbeat_interval interval not null
    ,   last_heartbeat timestamptz not null default now()
    ,   heartbeat_count int not null default 0
    ,   error_count int not null default 0
    ,   success_count int not null default 0
    ,   last_error_at timestamptz null default null
    ,   last_error_message text null default null
);

create index on ai.vectorizer_worker_process (last_heartbeat);


create table ai.vectorizer_worker_progress(
      vectorizer_id int primary key not null references ai.vectorizer (id) on delete cascade
    , success_count int not null default 0
    , error_count int not null default 0
    , last_success_at timestamptz null default null
    -- don't use foreign key here because of three reasons:
    -- 1. we don't want to enforce that the process exists in the process table (we may want to clean up that table independently)
    -- 2. we don't want have any chance this row will fail to be inserted.
    -- 3. we want the insert of this row to be as fast and lightweight as possible.
    , last_success_process_id uuid null default null
    , last_error_at timestamptz null default null
    , last_error_message text null default null
    --see reasons above for why we don't use foreign key here
    , last_error_process_id uuid null default null
);

$migration_body$;
begin
    select * into _migration from ai.pgai_lib_migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.pgai_lib_migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$__version__$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 021-drop-create-vectorizer-old-function.sql
do $outer_migration_block$ /*021-drop-create-vectorizer-old-function.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$021-drop-create-vectorizer-old-function.sql$migration_name$;
    _migration_body text =
$migration_body$
-- adding a new jsonb param to include the loader.
drop function if exists ai.create_vectorizer(regclass,name,jsonb,jsonb,jsonb,jsonb,jsonb,jsonb,name,name,name,name,name,name,name[],boolean);
-- adding a new boolean chunk_document to infer if we're validating a chunker that relies on documents.
drop function if exists ai._validate_chunking(jsonb,name,name);

-- dropping the old chunking functions.
drop function if exists ai.chunking_character_text_splitter(name,integer,integer,text,boolean);
drop function if exists ai.chunking_recursive_character_text_splitter(name,integer,integer,text[],boolean);



$migration_body$;
begin
    select * into _migration from ai.pgai_lib_migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.pgai_lib_migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$__version__$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 022-migrate-existing-vectorizers-to-loading.sql
do $outer_migration_block$ /*022-migrate-existing-vectorizers-to-loading.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$022-migrate-existing-vectorizers-to-loading.sql$migration_name$;
    _migration_body text =
$migration_body$
do language plpgsql $block$
DECLARE
    _vectorizer RECORD;
    _chunking jsonb;
    _chunk_column text;
    _config jsonb;
BEGIN
    -- Loop through all vectorizers
    FOR _vectorizer IN SELECT id, config FROM ai.vectorizer
    LOOP
        -- Extract the chunking config and chunk_column
        _chunking := _vectorizer.config operator(pg_catalog.->)'chunking';
        _chunk_column := _chunking operator(pg_catalog.->>)'chunk_column';
        
        IF _chunk_column IS NOT NULL THEN
            -- Create new config:
            -- 1. Add loading config
            -- 2. Add parsing config
            -- 3. Remove chunk_column from chunking config
            _config := _vectorizer.config operator(pg_catalog.||) jsonb_build_object(
                'loading', json_build_object(
                    'implementation', 'column',
                    'config_type', 'loading',
                    'column_name', _chunk_column,
                    'retries', 6
           ),
                'parsing', json_build_object(
                    'implementation', 'auto',
                    'config_type', 'parsing'
                ),
                'chunking', _chunking operator(pg_catalog.-) 'chunk_column',
                'version', '__version__'
            );
            
            -- Update the vectorizer with new config
            UPDATE ai.vectorizer 
            SET config = _config
            WHERE id operator(pg_catalog.=) _vectorizer.id;
        END IF;
    END LOOP;
end;
$block$;
$migration_body$;
begin
    select * into _migration from ai.pgai_lib_migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.pgai_lib_migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$__version__$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 023-migrate-vectorizer-queue-tables.sql
do $outer_migration_block$ /*023-migrate-vectorizer-queue-tables.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$023-migrate-vectorizer-queue-tables.sql$migration_name$;
    _migration_body text =
$migration_body$
do language plpgsql $block$
declare
    _rec pg_catalog.record;
    _sql pg_catalog.text;
begin
    -- loop through all vectorizers to extract queue tables information
    for _rec in (
        select queue_schema, queue_table from ai.vectorizer
    )
    loop

        select pg_catalog.format
               ( $sql$alter table %I.%I
                 add column if not exists loading_retries pg_catalog.int4 not null default 0
                 , add column if not exists loading_retry_after pg_catalog.timestamptz default null$sql$
                 , _rec.queue_schema
                 , _rec.queue_table
               ) into strict _sql;

        raise debug '%', _sql;
        execute _sql;
    end loop;
end;
$block$;

$migration_body$;
begin
    select * into _migration from ai.pgai_lib_migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.pgai_lib_migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$__version__$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 024-add-vectorizer-queue-failed-table.sql
do $outer_migration_block$ /*024-add-vectorizer-queue-failed-table.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$024-add-vectorizer-queue-failed-table.sql$migration_name$;
    _migration_body text =
$migration_body$
alter table ai.vectorizer add column queue_failed_table name;

$migration_body$;
begin
    select * into _migration from ai.pgai_lib_migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.pgai_lib_migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$__version__$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 025-migrate-vectorizer-to-have-queue-failed-table.sql
do $outer_migration_block$ /*025-migrate-vectorizer-to-have-queue-failed-table.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$025-migrate-vectorizer-to-have-queue-failed-table.sql$migration_name$;
    _migration_body text =
$migration_body$
do language plpgsql $block$
begin
    update ai.vectorizer
    set queue_failed_table = '_vectorizer_q_failed_' || id;
end
$block$;

do language plpgsql $block$
declare
    _sql pg_catalog.text;
    _vec ai.vectorizer;
    _grant_to text[];
begin
    -- loop through all vectorizers to extract queue tables information
    for _vec in (
        select * from ai.vectorizer
    )
    loop
        select array_agg(distinct(grantee)) into _grant_to
        from (
                 select (aclexplode(k.relacl)).grantee::regrole::text as grantee
                 from pg_class k
                     inner join pg_namespace n on (k.relnamespace = n.oid)
                 where k.relname = _vec.queue_table
                   and n.nspname = _vec.queue_schema
             ) as grants
        ;

        -- if no grantees found, use a sensible default or leave it null
        if _grant_to is null then
            _grant_to := '{}';
        end if;
        select pg_catalog.format
        ( $sql$
        create table %I.%I
        ( %s
        , created_at pg_catalog.timestamptz not null default now()
        , failure_step pg_catalog.text not null default ''
        )
        $sql$
        , _vec.queue_schema, _vec.queue_failed_table
        , (
            select pg_catalog.string_agg
            ( pg_catalog.format
              ( '%I %s not null'
              , x.attname
              , x.typname
              )
            , e'\n, '
            order by x.attnum
            )
            from pg_catalog.jsonb_to_recordset(_vec.source_pk) x(attnum int, attname name, typname name)
        )
        ) into strict _sql
        ;
        execute _sql;

        -- create the index
        select pg_catalog.format
        ( $sql$create index on %I.%I (%s)$sql$
        , _vec.queue_schema, _vec.queue_failed_table
        , (
            select pg_catalog.string_agg(pg_catalog.format('%I', x.attname), ', ' order by x.pknum)
            from pg_catalog.jsonb_to_recordset(_vec.source_pk) x(pknum int, attname name)
          )
        ) into strict _sql
        ;
        execute _sql;


        -- apply permissions if we found grantees
        if array_length(_grant_to, 1) > 0 then
        -- grant usage on queue schema to identified roles
        select pg_catalog.format
               ( $sql$grant usage on schema %I to %s$sql$
                   , _vec.queue_schema
                   , (
                     select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
                     from pg_catalog.unnest(_grant_to) x
                 )
               ) into strict _sql;

        execute _sql;

        -- grant select, update, delete on queue table to identified roles
        select pg_catalog.format
               ( $sql$grant select, insert, update, delete on %I.%I to %s$sql$
                    , _vec.queue_schema
                   , _vec.queue_failed_table
                   , (
                     select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
                     from pg_catalog.unnest(_grant_to) x
                 )
               ) into strict _sql;

        execute _sql;
        end if;

    end loop;
end $block$
;

$migration_body$;
begin
    select * into _migration from ai.pgai_lib_migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.pgai_lib_migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$__version__$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 026-change_trigger_definition.sql
do $outer_migration_block$ /*026-change_trigger_definition.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$026-change_trigger_definition.sql$migration_name$;
    _migration_body text =
$migration_body$
-- This code block recreates all triggers for vectorizers to make sure
-- they have the most recent version of the trigger function
do $upgrade_block$
declare
    _vec record;
begin
    -- Find all vectorizers
    for _vec in (
        select 
            v.id,
            v.source_schema,
            v.source_table,
            v.source_pk,
            v.target_schema,
            v.target_table,
            v.trigger_name,
            v.queue_schema,
            v.queue_table,
            v.config
        from ai.vectorizer v
    )
    loop
        raise notice 'Recreating trigger function for vectorizer ID %s', _vec.id;

        execute format
        (
        --weird indent is intentional to make the sql functions look the same as during a fresh install
        --otherwise the snapshots will not match during upgrade testing.
            $sql$
    create or replace function %I.%I() returns trigger 
    as $trigger_def$ 
    begin
    raise 'This trigger function should be redefined in the idempotent code'; 
    end 
    $trigger_def$ language plpgsql volatile parallel safe security definer 
    set search_path to pg_catalog, pg_temp
    $sql$
            , _vec.queue_schema, _vec.trigger_name
        );

        execute format(
            'drop trigger if exists %I on %I.%I',
            _vec.trigger_name, _vec.source_schema, _vec.source_table
        );

        execute format(
            'drop trigger if exists %I on %I.%I',
            format('%s_truncate',_vec.trigger_name) , _vec.source_schema, _vec.source_table
        );

        execute format(
            'create trigger %I after insert or update or delete on %I.%I for each row execute function %I.%I()',
            _vec.trigger_name, _vec.source_schema, _vec.source_table, _vec.queue_schema, _vec.trigger_name
        );

        execute format(
            'create trigger %I after truncate on %I.%I for each statement execute function %I.%I()',
            format('%s_truncate',_vec.trigger_name) , _vec.source_schema, _vec.source_table, _vec.queue_schema, _vec.trigger_name
        );
        
        raise info 'Successfully recreated trigger for vectorizer ID %', _vec.id;
    end loop;
end;
$upgrade_block$;

$migration_body$;
begin
    select * into _migration from ai.pgai_lib_migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.pgai_lib_migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$__version__$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 027-drop-loading-uri-old-signature.sql
do $outer_migration_block$ /*027-drop-loading-uri-old-signature.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$027-drop-loading-uri-old-signature.sql$migration_name$;
    _migration_body text =
$migration_body$
drop function if exists ai.loading_uri(pg_catalog.name,pg_catalog.int4);

$migration_body$;
begin
    select * into _migration from ai.pgai_lib_migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.pgai_lib_migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$__version__$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 028-remove-destination-columns.sql
do $outer_migration_block$ /*028-remove-destination-columns.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$028-remove-destination-columns.sql$migration_name$;
    _migration_body text =
$migration_body$
do language plpgsql $block$
declare
    _vectorizer RECORD;
    _target_schema text;
    _target_table text;
    _view_schema text;
    _view_name text;
    _config jsonb;
begin
    -- Loop through all vectorizers
    for _vectorizer in select id, target_schema, target_table, view_schema, view_name, config from ai.vectorizer
    loop
        -- Extract the chunking config and chunk_column
        _target_schema := _vectorizer.target_schema;
        _target_table := _vectorizer.target_table;
        _view_schema := _vectorizer.view_schema;
        _view_name := _vectorizer.view_name;

        -- Create new config:
        -- Add destination config
        _config := _vectorizer.config operator(pg_catalog.||) jsonb_build_object(
            'destination', json_build_object(
                'implementation', 'table',
                'config_type', 'destination',
                'target_schema', _target_schema,
                'target_table', _target_table,
                'view_schema', _view_schema,
                'view_name', _view_name
        ));

        -- Update the vectorizer with new config
        update ai.vectorizer 
        set config = _config
        where id operator(pg_catalog.=) _vectorizer.id;
    end loop;
end;
$block$;

-- These will be recreated by the idempotent migrations in new form that work despite the dropped columns
drop view if exists ai.vectorizer_status; 
drop event trigger if exists _vectorizer_handle_drops;
drop function if exists ai._vectorizer_handle_drops;

alter table ai.vectorizer 
    drop column target_schema,
    drop column target_table,
    drop column view_schema,
    drop column view_name;


drop FUNCTION IF EXISTS ai._vectorizer_build_trigger_definition(name,name,name,name,jsonb);
drop FUNCTION IF EXISTS ai.create_vectorizer(regclass,name,jsonb,jsonb,jsonb,jsonb,jsonb,jsonb,name,name,name,name,name,name,name[],boolean);
drop function if exists ai._vectorizer_vector_index_exists(name,name,jsonb);
drop function if exists ai._vectorizer_create_vector_index(name,name,jsonb);
$migration_body$;
begin
    select * into _migration from ai.pgai_lib_migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.pgai_lib_migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$__version__$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 029-add-vectorizer-name-column.sql
do $outer_migration_block$ /*029-add-vectorizer-name-column.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$029-add-vectorizer-name-column.sql$migration_name$;
    _migration_body text =
$migration_body$
alter table ai.vectorizer add column name name check (name ~ '^[a-z][a-z_0-9]*$');

do language plpgsql $block$
declare
    _vectorizer RECORD;
    _destination_type text;
    _target_schema text;
    _target_table text;
    _embedding_column text;
    _config jsonb;
    _name text;
    _destination_config jsonb;
begin
    -- Loop through all vectorizers
    for _vectorizer in select id, config from ai.vectorizer
    loop
        -- Extract the chunking config and chunk_column
        _config := _vectorizer.config;
        _destination_config := _config operator(pg_catalog.->) 'destination';
        _destination_type := _destination_config operator(pg_catalog.->>) 'implementation';
        if _destination_type = 'table' then
            _target_schema := _destination_config operator(pg_catalog.->>) 'target_schema';
            _target_table := _destination_config operator(pg_catalog.->>) 'target_table';
            _name := _target_schema operator(pg_catalog.||) '_' operator(pg_catalog.||) _target_table;
        elseif _destination_type = 'column' then
            _embedding_column := _destination_config operator(pg_catalog.->>) 'embedding_column';
            _name := _vectorizer.source_schema operator(pg_catalog.||) '_' operator(pg_catalog.||) _vectorizer.source_table operator(pg_catalog.||) '_' operator(pg_catalog.||) _embedding_column;
        end if;

        -- Update the vectorizer with new config
        update ai.vectorizer 
        set name = _name
        where id operator(pg_catalog.=) _vectorizer.id;
    end loop;
end;
$block$;

alter table ai.vectorizer alter column name set not null;
alter table ai.vectorizer add constraint vectorizer_name_unique unique (name);

drop function if exists ai.disable_vectorizer_schedule(int4);
drop function if exists ai.enable_vectorizer_schedule(int4);
drop function if exists ai.drop_vectorizer(int4, bool);
drop function if exists ai.vectorizer_queue_pending(int4, bool);
drop function if exists ai.vectorizer_embed(int4, text, text);
$migration_body$;
begin
    select * into _migration from ai.pgai_lib_migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.pgai_lib_migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$__version__$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 030-add_vectorizer_errors_view.sql
do $outer_migration_block$ /*030-add_vectorizer_errors_view.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$030-add_vectorizer_errors_view.sql$migration_name$;
    _migration_body text =
$migration_body$
-- rename the ai.vectorizer_errors table to ai._vectorizer_errors
alter table ai.vectorizer_errors rename to _vectorizer_errors;

-- rename the existing index on the ai.vectorizer_error so it follows the right naming convention (adds the _ prefix)
-- this is not strictly necessary, but it is a good practice to keep the naming consistent
alter index ai.vectorizer_errors_id_recorded_idx rename to _vectorizer_errors_id_recorded_idx;

-- create a view including vectorizer name
create or replace view ai.vectorizer_errors as
select 
  ve.*,
  v.name
from
  ai._vectorizer_errors ve
  left join ai.vectorizer v on ve.id = v.id;


-- grant privileges on new ai.vectorizer_errors view
do language plpgsql $block$
declare
    _sql text;
begin
    for _sql in
    (
        -- generate grant commands with SELECT privilege
        select format
        ( $$GRANT SELECT ON ai.vectorizer_errors TO %I%s$$
        , grantee.rolname
        , case when x.is_grantable then ' WITH GRANT OPTION'
          else ''
          end
        )
        from pg_class k
        inner join pg_namespace n on (k.relnamespace = n.oid)
        cross join lateral aclexplode(k.relacl) x
        inner join pg_roles grantee on (grantee.oid = x.grantee)
        where n.nspname = 'ai'
        and k.relname = '_vectorizer_errors' -- copy grants from the old table
        and x.privilege_type = 'SELECT' -- only SELECT privileges, no need others
        and not has_table_privilege -- only grant users with no privileges by default
          ( grantee.oid
          , 'ai.vectorizer_errors'::regclass::oid -- the view
          , case when x.is_grantable then 'SELECT WITH GRANT OPTION'
          else 'SELECT'
          end
          )
    )
    loop
        execute _sql;
    end loop;
end
$block$;
$migration_body$;
begin
    select * into _migration from ai.pgai_lib_migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.pgai_lib_migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$__version__$version$);
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 031-semantic-catalog.sql
do $outer_migration_block$ /*031-semantic-catalog.sql*/
declare
    _sql text;
    _migration record;
    _migration_name text = $migration_name$031-semantic-catalog.sql$migration_name$;
    _migration_body text =
$migration_body$

create table ai.semantic_catalog
( id int4 not null primary key generated by default as identity 
, catalog_name name not null unique check (catalog_name ~ '^[a-z][a-z_0-9]*$')
, obj_table name[2] not null check(array_length(fact_table, 1) = 2)
, sql_table name[2] not null check(array_length(fact_table, 1) = 2)
, fact_table name[2] not null check(array_length(fact_table, 1) = 2)
);

create table ai.semantic_catalog_embedding
( id int4 not null primary key generated by default as identity
, semantic_catalog_id int4 not null references ai.semantic_catalog (id) on delete cascade
, embedding_name name not null check (embedding_name ~ '^[a-z][a-z_0-9]*$')
, config jsonb not null check (jsonb_typeof(config) = 'object')
, unique (semantic_catalog_id, embedding_name)
);

$migration_body$;
begin
    select * into _migration from ai.pgai_lib_migration where "name" operator(pg_catalog.=) _migration_name;
    if _migration is not null then
        raise notice 'migration %s already applied. skipping.', _migration_name;
        if _migration.body operator(pg_catalog.!=) _migration_body then
            raise warning 'the contents of migration "%s" have changed', _migration_name;
        end if;
        return;
    end if;
    _sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
    execute _sql;
    insert into ai.pgai_lib_migration ("name", body, applied_at_version)
    values (_migration_name, _migration_body, $version$__version__$version$);
end;
$outer_migration_block$;

--------------------------------------------------------------------------------
-- 001-chunking.sql

-------------------------------------------------------------------------------
-- chunking_character_text_splitter
create or replace function ai.chunking_character_text_splitter
( chunk_size pg_catalog.int4 default 800
, chunk_overlap pg_catalog.int4 default 400
, separator pg_catalog.text default E'\n\n'
, is_separator_regex pg_catalog.bool default false
) returns pg_catalog.jsonb
as $func$
    select json_strip_nulls(json_build_object
    ( 'implementation', 'character_text_splitter'
    , 'config_type', 'chunking'
    , 'chunk_size', chunk_size
    , 'chunk_overlap', chunk_overlap
    , 'separator', separator
    , 'is_separator_regex', is_separator_regex
    ))
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- chunking_recursive_character_text_splitter
create or replace function ai.chunking_recursive_character_text_splitter
( chunk_size pg_catalog.int4 default 800
, chunk_overlap pg_catalog.int4 default 400
, separators pg_catalog.text[] default array[E'\n\n', E'\n', '.', '?', '!', ' ', '']
, is_separator_regex pg_catalog.bool default false
) returns pg_catalog.jsonb
as $func$
    select json_strip_nulls(json_build_object
    ( 'implementation', 'recursive_character_text_splitter'
    , 'config_type', 'chunking'
    , 'chunk_size', chunk_size
    , 'chunk_overlap', chunk_overlap
    , 'separators', separators
    , 'is_separator_regex', is_separator_regex
    ))
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- chunking_none
create or replace function ai.chunking_none() returns pg_catalog.jsonb
as $func$
    select json_build_object
    ( 'implementation', 'none'
    , 'config_type', 'chunking'
    )
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _validate_chunking
create or replace function ai._validate_chunking
( config pg_catalog.jsonb ) returns void
as $func$
declare
    _config_type pg_catalog.text;
    _implementation pg_catalog.text;
begin
    if pg_catalog.jsonb_typeof(config) operator(pg_catalog.!=) 'object' then
        raise exception 'chunking config is not a jsonb object';
    end if;

    _config_type = config operator(pg_catalog.->>) 'config_type';
    if _config_type is null or _config_type operator(pg_catalog.!=) 'chunking' then
        raise exception 'invalid config_type for chunking config';
    end if;

    _implementation = config operator(pg_catalog.->>) 'implementation';
    if _implementation is null or _implementation not in ('character_text_splitter', 'recursive_character_text_splitter', 'none') then
        raise exception 'invalid chunking config implementation';
    end if;
end
$func$ language plpgsql stable security invoker
set search_path to pg_catalog, pg_temp
;


--------------------------------------------------------------------------------
-- 002-formatting.sql

-------------------------------------------------------------------------------
-- formatting_python_template
create or replace function ai.formatting_python_template(template pg_catalog.text default '$chunk') returns pg_catalog.jsonb
as $func$
    select json_strip_nulls(json_build_object
    ( 'implementation', 'python_template'
    , 'config_type', 'formatting'
    , 'template', template
    ))
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _validate_formatting_python_template
create or replace function ai._validate_formatting_python_template
( config pg_catalog.jsonb
, source_schema pg_catalog.name
, source_table pg_catalog.name
) returns void
as $func$
declare
    _template pg_catalog.text;
    _found pg_catalog.bool;
begin
    select config operator(pg_catalog.->>) 'template'
    into strict _template
    ;
    if not pg_catalog.like(_template, '%$chunk%') then
        raise exception 'template must contain $chunk placeholder';
    end if;

    -- check that no columns on the source table are named "chunk"
    select count(*) operator(pg_catalog.>) 0 into strict _found
    from pg_catalog.pg_class k
    inner join pg_catalog.pg_namespace n on (k.relnamespace = n.oid)
    inner join pg_catalog.pg_attribute a on (k.oid = a.attrelid)
    where n.nspname operator(pg_catalog.=) source_schema
    and k.relname operator(pg_catalog.=) source_table
    and a.attnum operator(pg_catalog.>) 0
    and a.attname operator(pg_catalog.=) 'chunk'
    ;
    if _found then
        raise exception 'formatting_python_template may not be used when source table has a column named "chunk"';
    end if;
end
$func$ language plpgsql stable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _validate_formatting
create or replace function ai._validate_formatting
( config pg_catalog.jsonb
, source_schema pg_catalog.name
, source_table pg_catalog.name
) returns void
as $func$
declare
    _config_type pg_catalog.text;
begin
    if pg_catalog.jsonb_typeof(config) != 'object' then
        raise exception 'formatting config is not a jsonb object';
    end if;

    _config_type = config operator ( pg_catalog.->> ) 'config_type';
    if _config_type is null or _config_type operator(pg_catalog.!=) 'formatting' then
        raise exception 'invalid config_type for formatting config';
    end if;
    case config operator(pg_catalog.->>) 'implementation'
        when 'python_template' then
            perform ai._validate_formatting_python_template
            ( config
            , source_schema
            , source_table
            );
        else
            raise exception 'unrecognized formatting implementation';
    end case;
end
$func$ language plpgsql immutable security invoker
set search_path to pg_catalog, pg_temp
;


--------------------------------------------------------------------------------
-- 003-scheduling.sql

-------------------------------------------------------------------------------
-- scheduling_none
create or replace function ai.scheduling_none() returns pg_catalog.jsonb
as $func$
    select pg_catalog.jsonb_build_object
    ( 'implementation', 'none'
    , 'config_type', 'scheduling'
    )
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- scheduling_default
create or replace function ai.scheduling_default() returns pg_catalog.jsonb
as $func$
    select pg_catalog.jsonb_build_object
    ( 'implementation', 'default'
    , 'config_type', 'scheduling'
    )
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- scheduling_timescaledb
create or replace function ai.scheduling_timescaledb
( schedule_interval pg_catalog.interval default interval '5m'
, initial_start pg_catalog.timestamptz default null
, fixed_schedule pg_catalog.bool default null
, timezone pg_catalog.text default null
) returns pg_catalog.jsonb
as $func$
    select json_strip_nulls(json_build_object
    ( 'implementation', 'timescaledb'
    , 'config_type', 'scheduling'
    , 'schedule_interval', schedule_interval
    , 'initial_start', initial_start
    , 'fixed_schedule', fixed_schedule
    , 'timezone', timezone
    ))
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _resolve_scheduling_default
create or replace function ai._resolve_scheduling_default() returns pg_catalog.jsonb
as $func$
declare
    _setting pg_catalog.text;
begin
    select pg_catalog.current_setting('ai.scheduling_default', true) into _setting;
    case _setting
        when 'scheduling_timescaledb' then
            return ai.scheduling_timescaledb();
        else
            return ai.scheduling_none();
    end case;
end;
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _validate_scheduling
create or replace function ai._validate_scheduling(config pg_catalog.jsonb) returns void
as $func$
declare
    _config_type pg_catalog.text;
    _implementation pg_catalog.text;
begin
    if pg_catalog.jsonb_typeof(config) operator(pg_catalog.!=) 'object' then
        raise exception 'scheduling config is not a jsonb object';
    end if;

    _config_type = config operator(pg_catalog.->>) 'config_type';
    if _config_type is null or _config_type operator(pg_catalog.!=) 'scheduling' then
        raise exception 'invalid config_type for scheduling config';
    end if;
    _implementation = config operator(pg_catalog.->>) 'implementation';
    case _implementation
        when 'none' then
            -- ok
        when 'timescaledb' then
            -- ok
        else
            if _implementation is null then
                raise exception 'scheduling implementation not specified';
            else
                raise exception 'unrecognized scheduling implementation: "%"', _implementation;
            end if;
    end case;
end
$func$ language plpgsql immutable security invoker
set search_path to pg_catalog, pg_temp
;


--------------------------------------------------------------------------------
-- 004-embedding.sql

-------------------------------------------------------------------------------
-- embedding_openai
create or replace function ai.embedding_openai
( model pg_catalog.text
, dimensions pg_catalog.int4
, chat_user pg_catalog.text default null
, api_key_name pg_catalog.text default 'OPENAI_API_KEY'
, base_url text default null
) returns pg_catalog.jsonb
as $func$
    select json_strip_nulls(json_build_object
    ( 'implementation', 'openai'
    , 'config_type', 'embedding'
    , 'model', model
    , 'dimensions', dimensions
    , 'user', chat_user
    , 'api_key_name', api_key_name
    , 'base_url', base_url
    ))
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- embedding_ollama
create or replace function ai.embedding_ollama
( model pg_catalog.text
, dimensions pg_catalog.int4
, base_url pg_catalog.text default null
, options pg_catalog.jsonb default null
, keep_alive pg_catalog.text default null
) returns pg_catalog.jsonb
as $func$
    select json_strip_nulls(json_build_object
    ( 'implementation', 'ollama'
    , 'config_type', 'embedding'
    , 'model', model
    , 'dimensions', dimensions
    , 'base_url', base_url
    , 'options', options
    , 'keep_alive', keep_alive
    ))
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- embedding_voyageai
create or replace function ai.embedding_voyageai
( model pg_catalog.text
, dimensions pg_catalog.int4
, input_type pg_catalog.text default 'document'
, api_key_name pg_catalog.text default 'VOYAGE_API_KEY'
) returns pg_catalog.jsonb
as $func$
begin
    if input_type is not null and input_type not in ('query', 'document') then
        -- Note: purposefully not using an enum here because types make life complicated
        raise exception 'invalid input_type for voyage ai "%"', input_type;
    end if;

    return json_strip_nulls(json_build_object
    ( 'implementation', 'voyageai'
    , 'config_type', 'embedding'
    , 'model', model
    , 'dimensions', dimensions
    , 'input_type', input_type
    , 'api_key_name', api_key_name
    ));
end
$func$ language plpgsql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- embedding_litellm
create or replace function ai.embedding_litellm
( model pg_catalog.text
, dimensions pg_catalog.int4
, api_key_name pg_catalog.text default null
, extra_options pg_catalog.jsonb default null
) returns pg_catalog.jsonb
as $func$
begin
    return json_strip_nulls(json_build_object
    ( 'implementation', 'litellm'
    , 'config_type', 'embedding'
    , 'model', model
    , 'dimensions', dimensions
    , 'api_key_name', api_key_name
    , 'extra_options', extra_options
    ));
end
$func$ language plpgsql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _validate_embedding
create or replace function ai._validate_embedding(config pg_catalog.jsonb) returns void
as $func$
declare
    _config_type pg_catalog.text;
    _implementation pg_catalog.text;
begin
    if pg_catalog.jsonb_typeof(config) operator(pg_catalog.!=) 'object' then
        raise exception 'embedding config is not a jsonb object';
    end if;

    _config_type = config operator(pg_catalog.->>) 'config_type';
    if _config_type is null or _config_type operator(pg_catalog.!=) 'embedding' then
        raise exception 'invalid config_type for embedding config';
    end if;
    _implementation = config operator(pg_catalog.->>) 'implementation';
    case _implementation
        when 'openai' then
            -- ok
        when 'ollama' then
            -- ok
        when 'voyageai' then
            -- ok
        when 'litellm' then
            -- ok
        else
            if _implementation is null then
                raise exception 'embedding implementation not specified';
            else
                raise exception 'invalid embedding implementation: "%"', _implementation;
            end if;
    end case;
end
$func$ language plpgsql immutable security invoker
set search_path to pg_catalog, pg_temp
;


--------------------------------------------------------------------------------
-- 005-indexing.sql

-------------------------------------------------------------------------------
-- indexing_none
create or replace function ai.indexing_none() returns pg_catalog.jsonb
as $func$
    select jsonb_build_object
    ( 'implementation', 'none'
    , 'config_type', 'indexing'
    )
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- indexing_default
create or replace function ai.indexing_default() returns pg_catalog.jsonb
as $func$
    select jsonb_build_object
    ( 'implementation', 'default'
    , 'config_type', 'indexing'
    )
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- indexing_diskann
create or replace function ai.indexing_diskann
( min_rows pg_catalog.int4 default 100000
, storage_layout pg_catalog.text default null
, num_neighbors pg_catalog.int4 default null
, search_list_size pg_catalog.int4 default null
, max_alpha pg_catalog.float8 default null
, num_dimensions pg_catalog.int4 default null
, num_bits_per_dimension pg_catalog.int4 default null
, create_when_queue_empty pg_catalog.bool default true
) returns pg_catalog.jsonb
as $func$
    select json_strip_nulls(json_build_object
    ( 'implementation', 'diskann'
    , 'config_type', 'indexing'
    , 'min_rows', min_rows
    , 'storage_layout', storage_layout
    , 'num_neighbors', num_neighbors
    , 'search_list_size', search_list_size
    , 'max_alpha', max_alpha
    , 'num_dimensions', num_dimensions
    , 'num_bits_per_dimension', num_bits_per_dimension
    , 'create_when_queue_empty', create_when_queue_empty
    ))
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _resolve_indexing_default
create or replace function ai._resolve_indexing_default() returns pg_catalog.jsonb
as $func$
declare
    _setting pg_catalog.text;
begin
    select pg_catalog.current_setting('ai.indexing_default', true) into _setting;
    case _setting
        when 'indexing_diskann' then
            return ai.indexing_diskann();
        when 'indexing_hnsw' then
            return ai.indexing_hnsw();
        else
            return ai.indexing_none();
    end case;
end;
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _validate_indexing_diskann
create or replace function ai._validate_indexing_diskann(config pg_catalog.jsonb) returns void
as $func$
declare
    _storage_layout pg_catalog.text;
begin
    _storage_layout = config operator(pg_catalog.->>) 'storage_layout';
    if _storage_layout is not null and not (_storage_layout operator(pg_catalog.=) any(array['memory_optimized', 'plain'])) then
        raise exception 'invalid storage_layout';
    end if;
end
$func$ language plpgsql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- indexing_hnsw
create or replace function ai.indexing_hnsw
( min_rows pg_catalog.int4 default 100000
, opclass pg_catalog.text default 'vector_cosine_ops'
, m pg_catalog.int4 default null
, ef_construction pg_catalog.int4 default null
, create_when_queue_empty pg_catalog.bool default true
) returns pg_catalog.jsonb
as $func$
    select json_strip_nulls(json_build_object
    ( 'implementation', 'hnsw'
    , 'config_type', 'indexing'
    , 'min_rows', min_rows
    , 'opclass', opclass
    , 'm', m
    , 'ef_construction', ef_construction
    , 'create_when_queue_empty', create_when_queue_empty
    ))
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _validate_indexing_hnsw
create or replace function ai._validate_indexing_hnsw(config pg_catalog.jsonb) returns void
as $func$
declare
    _opclass pg_catalog.text;
begin
    _opclass = config operator(pg_catalog.->>) 'opclass';
    if _opclass is not null
    and not (_opclass operator(pg_catalog.=) any(array['vector_ip_ops', 'vector_cosine_ops', 'vector_l1_ops'])) then
        raise exception 'invalid opclass';
    end if;
end
$func$ language plpgsql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _validate_indexing
create or replace function ai._validate_indexing(config pg_catalog.jsonb) returns void
as $func$
declare
    _config_type pg_catalog.text;
    _implementation pg_catalog.text;
begin
    if pg_catalog.jsonb_typeof(config) operator(pg_catalog.!=) 'object' then
        raise exception 'indexing config is not a jsonb object';
    end if;

    _config_type = config operator(pg_catalog.->>) 'config_type';
    if _config_type is null or _config_type operator(pg_catalog.!=) 'indexing' then
        raise exception 'invalid config_type for indexing config';
    end if;
    _implementation = config operator(pg_catalog.->>) 'implementation';
    case _implementation
        when 'none' then
            -- ok
        when 'diskann' then
            perform ai._validate_indexing_diskann(config);
        when 'hnsw' then
            perform ai._validate_indexing_hnsw(config);
        else
            if _implementation is null then
                raise exception 'indexing implementation not specified';
            else
                raise exception 'invalid indexing implementation: "%"', _implementation;
            end if;
    end case;
end
$func$ language plpgsql immutable security invoker
set search_path to pg_catalog, pg_temp
;



--------------------------------------------------------------------------------
-- 006-processing.sql

-------------------------------------------------------------------------------
-- processing_default
create or replace function ai.processing_default
( batch_size pg_catalog.int4 default null
, concurrency pg_catalog.int4 default null
) returns pg_catalog.jsonb
as $func$
    select json_strip_nulls(json_build_object
    ( 'implementation', 'default'
    , 'config_type', 'processing'
    , 'batch_size', batch_size
    , 'concurrency', concurrency
    ))
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _validate_processing
create or replace function ai._validate_processing(config pg_catalog.jsonb) returns void
as $func$
declare
    _config_type pg_catalog.text;
    _implementation pg_catalog.text;
    _val pg_catalog.jsonb;
begin
    if pg_catalog.jsonb_typeof(config) operator(pg_catalog.!=) 'object' then
        raise exception 'processing config is not a jsonb object';
    end if;

    _config_type = config operator(pg_catalog.->>) 'config_type';
    if _config_type is null or _config_type operator(pg_catalog.!=) 'processing' then
        raise exception 'invalid config_type for processing config';
    end if;
    _implementation = config operator(pg_catalog.->>) 'implementation';
    case _implementation
        when 'default' then
            _val = pg_catalog.jsonb_extract_path(config, 'batch_size');
            if _val is not null then
                if pg_catalog.jsonb_typeof(_val) operator(pg_catalog.!=) 'number' then
                    raise exception 'batch_size must be a number';
                end if;
                if cast(_val as pg_catalog.int4) operator(pg_catalog.>) 2048 then
                    raise exception 'batch_size must be less than or equal to 2048';
                end if;
                if cast(_val as pg_catalog.int4) operator(pg_catalog.<) 1 then
                    raise exception 'batch_size must be greater than 0';
                end if;
            end if;

            _val = pg_catalog.jsonb_extract_path(config, 'concurrency');
            if _val is not null then
                if pg_catalog.jsonb_typeof(_val) operator(pg_catalog.!=) 'number' then
                    raise exception 'concurrency must be a number';
                end if;
                if cast(_val as pg_catalog.int4) operator(pg_catalog.>) 50 then
                    raise exception 'concurrency must be less than or equal to 50';
                end if;
                if cast(_val as pg_catalog.int4) operator(pg_catalog.<) 1 then
                    raise exception 'concurrency must be greater than 0';
                end if;
            end if;
        else
            if _implementation is null then
                raise exception 'processing implementation not specified';
            else
                raise exception 'unrecognized processing implementation: "%"', _implementation;
            end if;
    end case;
end
$func$ language plpgsql immutable security invoker
set search_path to pg_catalog, pg_temp
;


--------------------------------------------------------------------------------
-- 007-grant-to.sql
-------------------------------------------------------------------------------
-- grant_to
create or replace function ai.grant_to(variadic grantees pg_catalog.name[]) returns pg_catalog.name[]
as $func$
    select coalesce(pg_catalog.array_agg(cast(x as pg_catalog.name)), array[]::pg_catalog.name[])
    from (
        select pg_catalog.unnest(grantees) x
        union
        select trim(pg_catalog.string_to_table(pg_catalog.current_setting('ai.grant_to_default', true), ',')) x
    ) _;
$func$ language sql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- grant_to
create or replace function ai.grant_to() returns pg_catalog.name[]
as $func$
    select ai.grant_to(variadic array[]::pg_catalog.name[])
$func$ language sql volatile security invoker
set search_path to pg_catalog, pg_temp
;


--------------------------------------------------------------------------------
-- 008-loading.sql
-------------------------------------------------------------------------------
-- loading_column
create or replace function ai.loading_column
( column_name pg_catalog.name
, retries pg_catalog.int4 default 6)
returns pg_catalog.jsonb
as $func$
    select json_build_object
    ( 'implementation', 'column'
    , 'config_type', 'loading'
    , 'column_name', column_name
    , 'retries', retries
    )
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- loading_uri
create or replace function ai.loading_uri
( column_name pg_catalog.name
, retries pg_catalog.int4 default 6
, aws_role_arn pg_catalog.text default null)
returns pg_catalog.jsonb
as $func$
    select json_strip_nulls(json_build_object
    ( 'implementation', 'uri'
    , 'config_type', 'loading'
    , 'column_name', column_name
    , 'retries', retries
    , 'aws_role_arn', aws_role_arn
    ))
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _validate_loading
create or replace function ai._validate_loading
( config pg_catalog.jsonb
, source_schema pg_catalog.name
, source_table pg_catalog.name
) returns void
as $func$
declare
    _config_type pg_catalog.text;
    _implementation pg_catalog.text;
    _column_name pg_catalog.name;
    _found pg_catalog.bool;
    _column_type pg_catalog.text;
begin
    if pg_catalog.jsonb_typeof(config) operator(pg_catalog.!=) 'object' then
        raise exception 'loading config is not a jsonb object';
end if;

    _config_type = config operator(pg_catalog.->>) 'config_type';
    if _config_type is null or _config_type operator(pg_catalog.!=) 'loading' then
        raise exception 'invalid config_type for loading config';
end if;

    _implementation = config operator(pg_catalog.->>) 'implementation';
    if _implementation is null or _implementation not in ('column', 'uri') then
        raise exception 'invalid loading config implementation';
end if;

    _column_name = config operator(pg_catalog.->>) 'column_name';
     if _column_name is null then
        raise exception 'invalid loading config, missing column_name';
end if;

    if (config operator(pg_catalog.->>) 'retries') is null or (config operator(pg_catalog.->>) 'retries')::int < 0 then
        raise exception 'invalid loading config, retries must be a non-negative integer';
end if;
    if (config operator(pg_catalog.->>) 'aws_role_arn') is not null and (config operator(pg_catalog.->>) 'aws_role_arn') not like 'arn:aws:iam::%:role/%' then
        raise exception 'invalid loading config, aws_role_arn must match arn:aws:iam::*:role/*';
end if;

    select y.typname into _column_type
    from pg_catalog.pg_class k
        inner join pg_catalog.pg_namespace n on (k.relnamespace operator(pg_catalog.=) n.oid)
        inner join pg_catalog.pg_attribute a on (k.oid operator(pg_catalog.=) a.attrelid)
        inner join pg_catalog.pg_type y on (a.atttypid operator(pg_catalog.=) y.oid)
    where n.nspname operator(pg_catalog.=) source_schema
        and k.relname operator(pg_catalog.=) source_table
        and a.attnum operator(pg_catalog.>) 0
        and a.attname operator(pg_catalog.=) _column_name
        and not a.attisdropped;

    if _column_type is null then
            raise exception 'column_name in config does not exist in the table: %', _column_name;
    end if;

    if _column_type not in ('text', 'varchar', 'char', 'bpchar', 'bytea') then
            raise exception 'column_name % in config is of invalid type %. Supported types are: text, varchar, char, bpchar, bytea', _column_name, _column_type;
    end if;

    if _implementation = 'uri' and _column_type not in ('text', 'varchar', 'char', 'bpchar') then
        raise exception 'the type of the column `%` in config is not compatible with `uri` loading '
       'implementation (type should be either text, varchar, char, bpchar, or bytea)', _column_name;
    end if;
end
$func$ language plpgsql stable security invoker
set search_path to pg_catalog, pg_temp
;


--------------------------------------------------------------------------------
-- 009-parsing.sql
-------------------------------------------------------------------------------
-- parsing_auto
create or replace function ai.parsing_auto() returns pg_catalog.jsonb
as $func$
    select json_build_object
    ( 'implementation', 'auto'
    , 'config_type', 'parsing'
    )
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- parsing_none
create or replace function ai.parsing_none() returns pg_catalog.jsonb
as $func$
    select json_build_object
    ( 'implementation', 'none'
    , 'config_type', 'parsing'
    )
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- parser_pymupdf
create or replace function ai.parsing_pymupdf() returns pg_catalog.jsonb
as $func$
    select json_build_object
    ( 'implementation', 'pymupdf'
    , 'config_type', 'parsing'
    )
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- parser_docling
create or replace function ai.parsing_docling() returns pg_catalog.jsonb
as $func$
    select json_build_object
    ( 'implementation', 'docling'
    , 'config_type', 'parsing'
    )
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _validate_parsing
create or replace function ai._validate_parsing
( parsing pg_catalog.jsonb
, loading pg_catalog.jsonb
, source_schema pg_catalog.name
, source_table pg_catalog.name
) returns void
as $func$
declare
    _column_type pg_catalog.name;
    _config_type pg_catalog.text;
    _loading_implementation pg_catalog.text;
    _parsing_implementation pg_catalog.text;
begin
    -- Basic structure validation
    if pg_catalog.jsonb_typeof(parsing) operator(pg_catalog.!=) 'object' then
        raise exception 'parsing config is not a jsonb object';
    end if;

    -- Validate config_type
    _config_type = parsing operator(pg_catalog.->>) 'config_type';
    if _config_type is null or _config_type operator(pg_catalog.!=) 'parsing' then
        raise exception 'invalid config_type for parsing config';
    end if;

    -- Get implementations
    _loading_implementation = loading operator(pg_catalog.->>) 'implementation';
    -- Skip validation of loading implementation since it's done in _validate_loading

    _parsing_implementation = parsing operator(pg_catalog.->>) 'implementation';
    if _parsing_implementation not in ('auto', 'none', 'pymupdf', 'docling') then
        raise exception 'invalid parsing config implementation';
    end if;

    -- Get the column type once
    select y.typname 
    into _column_type
    from pg_catalog.pg_class k
        inner join pg_catalog.pg_namespace n on (k.relnamespace operator(pg_catalog.=) n.oid)
        inner join pg_catalog.pg_attribute a on (k.oid operator(pg_catalog.=) a.attrelid)
        inner join pg_catalog.pg_type y on (a.atttypid operator(pg_catalog.=) y.oid)
    where n.nspname operator(pg_catalog.=) source_schema
    and k.relname operator(pg_catalog.=) source_table
    and a.attnum operator(pg_catalog.>) 0
    and a.attname operator(pg_catalog.=) (loading operator(pg_catalog.->>) 'column_name');

    -- Validate all combinations
    if _parsing_implementation = 'none' and _column_type = 'bytea' then
        raise exception 'cannot use parsing_none with bytea columns';
    end if;

    if _loading_implementation = 'column' and _parsing_implementation in ('pymupdf', 'docling')
       and _column_type != 'bytea' then
        raise exception 'parsing_% must be used with a bytea column', _parsing_implementation;
    end if;

end
$func$ language plpgsql stable security invoker
set search_path to pg_catalog, pg_temp;


--------------------------------------------------------------------------------
-- 010-destination.sql
-------------------------------------------------------------------------------
-- destination_table
create or replace function ai.destination_table
(
    destination pg_catalog.name default null
    , target_schema pg_catalog.name default null
    , target_table pg_catalog.name default null
    , view_schema pg_catalog.name default null
    , view_name pg_catalog.name default null
) returns pg_catalog.jsonb
as $func$
    select json_strip_nulls(json_build_object
    ( 'implementation', 'table'
    , 'config_type', 'destination'
    , 'destination', destination
    , 'target_schema', target_schema
    , 'target_table', target_table
    , 'view_schema', view_schema
    , 'view_name', view_name
    ))
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- destination_column
create or replace function ai.destination_column
(
    embedding_column pg_catalog.name
) returns pg_catalog.jsonb
as $func$
    select json_strip_nulls(json_build_object
    ( 'implementation', 'column'
    , 'config_type', 'destination'
    , 'embedding_column', embedding_column
    ))
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- validate_destination
create or replace function ai._validate_destination
( destination pg_catalog.jsonb
, chunking pg_catalog.jsonb ) returns void
as $func$
declare
    _config_type pg_catalog.text;
begin
    if pg_catalog.jsonb_typeof(destination) operator(pg_catalog.!=) 'object' then
        raise exception 'destination config is not a jsonb object';
    end if;

    _config_type = destination operator(pg_catalog.->>) 'config_type';
    if _config_type is null or _config_type operator(pg_catalog.!=) 'destination' then
        raise exception 'invalid config_type for destination config';
    end if;

    if destination->>'implementation' = 'column' then
        if chunking->>'implementation' != 'none' then
            raise exception 'chunking must be none for column destination';
        end if;
    end if;
end
$func$ language plpgsql immutable security invoker
set search_path to pg_catalog, pg_temp
;


-------------------------------------------------------------------------------
-- evaluate_destination
create or replace function ai._evaluate_destination
( destination pg_catalog.jsonb,
source_schema pg_catalog.name,
source_table pg_catalog.name
) returns jsonb
as $func$
declare
    target_schema pg_catalog.name;
    target_table pg_catalog.name;
    view_schema pg_catalog.name;
    view_name pg_catalog.name;
begin
    if destination operator(pg_catalog.->>) 'implementation' = 'table' then
        target_schema = coalesce(destination operator(pg_catalog.->>) 'target_schema', source_schema);
        target_table = case
            when destination operator(pg_catalog.->>) 'target_table' is not null then destination operator(pg_catalog.->>) 'target_table'
            when destination operator(pg_catalog.->>) 'destination' is not null then pg_catalog.concat(destination operator(pg_catalog.->>) 'destination', '_store')
            else pg_catalog.concat(source_table, '_embedding_store')
        end;
        view_schema = coalesce(view_schema, source_schema);
        view_name = case
            when destination operator(pg_catalog.->>) 'view_name' is not null then destination operator(pg_catalog.->>) 'view_name'
            when destination operator(pg_catalog.->>) 'destination' is not null then destination operator(pg_catalog.->>) 'destination'
            else pg_catalog.concat(source_table, '_embedding')
        end;
        return json_build_object
        ( 'implementation', 'table'
        , 'config_type', 'destination'
        , 'target_schema', target_schema
        , 'target_table', target_table
        , 'view_schema', view_schema
        , 'view_name', view_name
        );
    elseif destination operator(pg_catalog.->>) 'implementation' = 'column' then
        return json_build_object
        ( 'implementation', 'column'
        , 'config_type', 'destination'
        , 'embedding_column', destination operator(pg_catalog.->>) 'embedding_column'
        );
    else
        raise exception 'invalid implementation for destination config';
    end if;
end
$func$ language plpgsql stable security invoker
set search_path to pg_catalog, pg_temp
;

create or replace function ai._validate_destination_can_create_objects(destination pg_catalog.jsonb) returns void
as $func$
declare
    _config_type pg_catalog.text;
begin
    if destination operator(pg_catalog.->>) 'implementation' = 'table' then
         -- make sure view name is available
        if pg_catalog.to_regclass(pg_catalog.format('%I.%I', destination operator(pg_catalog.->>) 'view_schema', destination operator(pg_catalog.->>) 'view_name')) is not null then
            raise exception 'an object named %.% already exists. specify an alternate destination or view_name explicitly', destination operator(pg_catalog.->>) 'view_schema', destination operator(pg_catalog.->>) 'view_name'
            using errcode = 'duplicate_object';
        end if;
    
        -- make sure target table name is available
        if pg_catalog.to_regclass(pg_catalog.format('%I.%I', destination operator(pg_catalog.->>) 'target_schema', destination operator(pg_catalog.->>) 'target_table')) is not null then
            raise exception 'an object named %.% already exists. specify an alternate destination or target_table explicitly', destination operator(pg_catalog.->>) 'target_schema', destination operator(pg_catalog.->>) 'target_table'
            using errcode = 'duplicate_object';
        end if;
    end if;
end
$func$ language plpgsql stable security invoker
set search_path to pg_catalog, pg_temp
;

--------------------------------------------------------------------------------
-- 011-vectorizer-int.sql

-------------------------------------------------------------------------------
-- _vectorizer_source_pk
create or replace function ai._vectorizer_source_pk(source_table pg_catalog.regclass) returns pg_catalog.jsonb as
$func$
    select pg_catalog.jsonb_agg(x)
    from
    (
        select e.attnum, e.pknum, a.attname, pg_catalog.format_type(y.oid, a.atttypmod) as typname
        from pg_catalog.pg_constraint k
        cross join lateral pg_catalog.unnest(k.conkey) with ordinality e(attnum, pknum)
        inner join pg_catalog.pg_attribute a
            on (k.conrelid operator(pg_catalog.=) a.attrelid
                and e.attnum operator(pg_catalog.=) a.attnum)
        inner join pg_catalog.pg_type y on (a.atttypid operator(pg_catalog.=) y.oid)
        where k.conrelid operator(pg_catalog.=) source_table
        and k.contype operator(pg_catalog.=) 'p'
    ) x
$func$
language sql stable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _vectorizer_grant_to_source
create or replace function ai._vectorizer_grant_to_source
( source_schema pg_catalog.name
, source_table pg_catalog.name
, grant_to pg_catalog.name[]
) returns void as
$func$
declare
    _sql pg_catalog.text;
begin
    if grant_to is not null then
        -- grant usage on source schema to grant_to roles
        select pg_catalog.format
        ( $sql$grant usage on schema %I to %s$sql$
        , source_schema
        , (
            select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
            from pg_catalog.unnest(grant_to) x
          )
        ) into strict _sql;
        execute _sql;

        -- grant select on source table to grant_to roles
        select pg_catalog.format
        ( $sql$grant select on %I.%I to %s$sql$
        , source_schema
        , source_table
        , (
            select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
            from pg_catalog.unnest(grant_to) x
          )
        ) into strict _sql;
        execute _sql;
    end if;
end;
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _vectorizer_grant_to_vectorizer
create or replace function ai._vectorizer_grant_to_vectorizer(grant_to pg_catalog.name[]) returns void as
$func$
declare
    _sql pg_catalog.text;
begin
    if grant_to is not null then
        -- grant usage on schema ai to grant_to roles
        select pg_catalog.format
        ( $sql$grant usage on schema ai to %s$sql$
        , (
            select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
            from pg_catalog.unnest(grant_to) x
          )
        ) into strict _sql;
        execute _sql;

        -- grant select on vectorizer table to grant_to roles
        select pg_catalog.format
        ( $sql$grant select on ai.vectorizer to %s$sql$
        , (
            select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
            from pg_catalog.unnest(grant_to) x
          )
        ) into strict _sql;
        execute _sql;
    end if;
end;
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _vectorizer_create_destination_table
create or replace function ai._vectorizer_create_destination_table
(   source_schema pg_catalog.name
    , source_table pg_catalog.name
    , source_pk pg_catalog.jsonb
    , dimensions pg_catalog.int4
    , destination jsonb
    , grant_to pg_catalog.name[]
) returns void as
$func$
declare
    target_schema pg_catalog.name;
    target_table pg_catalog.name;
    view_schema pg_catalog.name;
    view_name pg_catalog.name;
begin

    target_schema = destination operator(pg_catalog.->>) 'target_schema';
    target_table = destination operator(pg_catalog.->>) 'target_table';
    view_schema = destination operator(pg_catalog.->>) 'view_schema';
    view_name = destination operator(pg_catalog.->>) 'view_name';

    -- create the target table
    perform ai._vectorizer_create_target_table
    ( source_pk
    , target_schema
    , target_table
    , dimensions
    , grant_to
    );

    perform ai._vectorizer_create_view
    ( view_schema
    , view_name
    , source_schema
    , source_table
    , source_pk
    , target_schema
    , target_table
    , grant_to
    );
end;
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

------------------------------------------------------------------------------- 
-- _vectorizer_create_destination_column
create or replace function ai._vectorizer_create_destination_column
(   source_schema pg_catalog.name
    , source_table pg_catalog.name
    , dimensions pg_catalog.int4
    , destination jsonb
) returns void as
$func$
declare
    embedding_column pg_catalog.name;
begin
    embedding_column = destination operator(pg_catalog.->>) 'embedding_column';
    perform ai._vectorizer_add_embedding_column
    ( source_schema
    , source_table
    , dimensions
    , embedding_column
    );
end;
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _vectorizer_add_embedding_column
create or replace function ai._vectorizer_add_embedding_column
( source_schema pg_catalog.name
, source_table pg_catalog.name
, dimensions pg_catalog.int4
, embedding_column pg_catalog.name
) returns void as
$func$
declare
    _sql pg_catalog.text;
    _column_exists pg_catalog.bool;
begin
    -- Check if embedding column already exists
    select exists(
        select 1 
        from pg_catalog.pg_attribute a
        join pg_catalog.pg_class c on a.attrelid = c.oid
        join pg_catalog.pg_namespace n on c.relnamespace = n.oid
        where n.nspname = source_schema
        and c.relname = source_table
        and a.attname = embedding_column
        and not a.attisdropped
    ) into _column_exists;

    if _column_exists then
        raise notice 'embedding column %I already exists in %I.%I skipping creation', embedding_column, source_schema, source_table;
        return;
    else
        -- Add embedding column to source table
        select pg_catalog.format(
            $sql$
            alter table %I.%I 
            add column %I @extschema:vector@.vector(%L) default null
            $sql$,
            source_schema, source_table, embedding_column, dimensions
        ) into strict _sql;

        execute _sql;

        select pg_catalog.format(
            $sql$alter table %I.%I alter column %I set storage main$sql$,
            source_schema, source_table, embedding_column
        ) into strict _sql;

        execute _sql;
    end if;
end;
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp;
-------------------------------------------------------------------------------
-- _vectorizer_create_target_table
create or replace function ai._vectorizer_create_target_table
( source_pk pg_catalog.jsonb
, target_schema pg_catalog.name
, target_table pg_catalog.name
, dimensions pg_catalog.int4
, grant_to pg_catalog.name[]
) returns void as
$func$
declare
    _pk_cols pg_catalog.text;
    _sql pg_catalog.text;
begin
    select pg_catalog.string_agg(pg_catalog.format('%I', x.attname), ', ' order by x.pknum)
    into strict _pk_cols
    from pg_catalog.jsonb_to_recordset(source_pk) x(pknum int, attname name)
    ;

    select pg_catalog.format
    ( $sql$
    create table %I.%I
    ( embedding_uuid uuid not null primary key default pg_catalog.gen_random_uuid()
    , %s
    , chunk_seq int not null
    , chunk text not null
    , embedding @extschema:vector@.vector(%L) not null
    , unique (%s, chunk_seq)
    )
    $sql$
    , target_schema, target_table
    , (
        select pg_catalog.string_agg
        (
            pg_catalog.format
            ( '%I %s not null'
            , x.attname
            , x.typname
            )
            , E'\n, '
            order by x.attnum
        )
        from pg_catalog.jsonb_to_recordset(source_pk)
            x(attnum int, attname name, typname name)
      )
    , dimensions
    , _pk_cols
    ) into strict _sql
    ;
    execute _sql;

    select pg_catalog.format
       ( $sql$alter table %I.%I alter column embedding set storage main$sql$
       , target_schema
       , target_table
       ) into strict _sql
    ;
    execute _sql;

    if grant_to is not null then
        -- grant usage on target schema to grant_to roles
        select pg_catalog.format
        ( $sql$grant usage on schema %I to %s$sql$
        , target_schema
        , (
            select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
            from pg_catalog.unnest(grant_to) x
          )
        ) into strict _sql;
        execute _sql;

        -- grant select, insert, update on target table to grant_to roles
        select pg_catalog.format
        ( $sql$grant select, insert, update on %I.%I to %s$sql$
        , target_schema
        , target_table
        , (
            select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
            from pg_catalog.unnest(grant_to) x
          )
        ) into strict _sql;
        execute _sql;
    end if;
end;
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _vectorizer_create_view
create or replace function ai._vectorizer_create_view
( view_schema pg_catalog.name
, view_name pg_catalog.name
, source_schema pg_catalog.name
, source_table pg_catalog.name
, source_pk pg_catalog.jsonb
, target_schema pg_catalog.name
, target_table pg_catalog.name
, grant_to pg_catalog.name[]
) returns void as
$func$
declare
    _sql pg_catalog.text;
begin
    select pg_catalog.format
    ( $sql$
    create view %I.%I as
    select
      t.embedding_uuid
    , t.chunk_seq
    , t.chunk
    , t.embedding
    , %s
    from %I.%I t
    left outer join %I.%I s
    on (%s)
    $sql$
    , view_schema, view_name
    , (
        -- take primary keys from the target table and other columns from source
        -- this allows for join removal optimization
        select pg_catalog.string_agg
        (
            pg_catalog.format
            ( '%s.%I'
            , case when x.attnum is not null then 't' else 's' end
            , a.attname
            )
            , E'\n    , '
            order by a.attnum
        )
        from pg_catalog.pg_attribute a
        left outer join pg_catalog.jsonb_to_recordset(source_pk) x(attnum int) on (a.attnum operator(pg_catalog.=) x.attnum)
        where a.attrelid operator(pg_catalog.=) pg_catalog.format('%I.%I', source_schema, source_table)::pg_catalog.regclass::pg_catalog.oid
        and a.attnum operator(pg_catalog.>) 0
        and not a.attisdropped
      )
    , target_schema, target_table
    , source_schema, source_table
    , (
        select pg_catalog.string_agg
        (
            pg_catalog.format
            ( 't.%s = s.%s'
            , x.attname
            , x.attname
            )
            , ' and '
            order by x.pknum
        )
        from pg_catalog.jsonb_to_recordset(source_pk)
            x(pknum int, attname name)
      )
    ) into strict _sql;
    execute _sql;

    if grant_to is not null then
        -- grant usage on view schema to grant_to roles
        select pg_catalog.format
        ( $sql$grant usage on schema %I to %s$sql$
        , view_schema
        , (
            select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
            from pg_catalog.unnest(grant_to) x
          )
        ) into strict _sql;
        execute _sql;

        -- grant select on view to grant_to roles
        select pg_catalog.format
        ( $sql$grant select on %I.%I to %s$sql$
        , view_schema
        , view_name
        , (
            select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
            from pg_catalog.unnest(grant_to) x
          )
        ) into strict _sql;
        execute _sql;
    end if;
end
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-- _vectorizer_create_queue_table
create or replace function ai._vectorizer_create_queue_table
( queue_schema pg_catalog.name
, queue_table pg_catalog.name
, source_pk pg_catalog.jsonb
, grant_to pg_catalog.name[]
) returns void as
$func$
declare
    _sql pg_catalog.text;
begin
    -- create the table
    select pg_catalog.format
    ( $sql$
      create table %I.%I
      ( %s
      , queued_at pg_catalog.timestamptz not null default now()
      , loading_retries pg_catalog.int4 not null default 0
      , loading_retry_after pg_catalog.timestamptz
      )
      $sql$
    , queue_schema, queue_table
    , (
        select pg_catalog.string_agg
        (
          pg_catalog.format
          ( '%I %s not null'
          , x.attname
          , x.typname
          )
          , E'\n, '
          order by x.attnum
        )
        from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name, typname name)
      )
    ) into strict _sql
    ;
    execute _sql;

    -- create the index
    select pg_catalog.format
    ( $sql$create index on %I.%I (%s)$sql$
    , queue_schema, queue_table
    , (
        select pg_catalog.string_agg(pg_catalog.format('%I', x.attname), ', ' order by x.pknum)
        from pg_catalog.jsonb_to_recordset(source_pk) x(pknum int, attname name)
      )
    ) into strict _sql
    ;
    execute _sql;

    if grant_to is not null then
        -- grant usage on queue schema to grant_to roles
        select pg_catalog.format
        ( $sql$grant usage on schema %I to %s$sql$
        , queue_schema
        , (
            select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
            from pg_catalog.unnest(grant_to) x
          )
        ) into strict _sql;
        execute _sql;

        -- grant select, update, delete on queue table to grant_to roles
        select pg_catalog.format
        ( $sql$grant select, insert, update, delete on %I.%I to %s$sql$
        , queue_schema
        , queue_table
        , (
            select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
            from pg_catalog.unnest(grant_to) x
          )
        ) into strict _sql;
        execute _sql;
    end if;
end;
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _vectorizer_create_queue_failed_table
create or replace function ai._vectorizer_create_queue_failed_table
( queue_schema pg_catalog.name
, queue_failed_table pg_catalog.name
, source_pk pg_catalog.jsonb
, grant_to pg_catalog.name[]
) returns void as
$func$
declare
    _sql pg_catalog.text;
begin
    -- create the table
    select pg_catalog.format
    ( $sql$
      create table %I.%I
      ( %s
      , created_at pg_catalog.timestamptz not null default now()
      , failure_step pg_catalog.text not null default ''
      )
      $sql$
    , queue_schema, queue_failed_table
    , (
        select pg_catalog.string_agg
        (
          pg_catalog.format
          ( '%I %s not null'
          , x.attname
          , x.typname
          )
          , E'\n, '
          order by x.attnum
        )
        from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name, typname name)
      )
    ) into strict _sql
    ;
    execute _sql;

    -- create the index
    select pg_catalog.format
    ( $sql$create index on %I.%I (%s)$sql$
    , queue_schema, queue_failed_table
    , (
        select pg_catalog.string_agg(pg_catalog.format('%I', x.attname), ', ' order by x.pknum)
        from pg_catalog.jsonb_to_recordset(source_pk) x(pknum int, attname name)
      )
    ) into strict _sql
    ;
    execute _sql;

    if grant_to is not null then
        -- grant usage on queue schema to grant_to roles
        select pg_catalog.format
        ( $sql$grant usage on schema %I to %s$sql$
        , queue_schema
        , (
            select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
            from pg_catalog.unnest(grant_to) x
          )
        ) into strict _sql;
        execute _sql;

        -- grant select, update, delete on queue table to grant_to roles
        select pg_catalog.format
        ( $sql$grant select, insert, update, delete on %I.%I to %s$sql$
        , queue_schema
        , queue_failed_table
        , (
            select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
            from pg_catalog.unnest(grant_to) x
          )
        ) into strict _sql;
        execute _sql;
    end if;
end;
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;
-------------------------------------------------------------------------------
-- _build_vectorizer_trigger_definition
create or replace function ai._vectorizer_build_trigger_definition
( queue_schema pg_catalog.name
, queue_table pg_catalog.name
, target_schema pg_catalog.name
, target_table pg_catalog.name
, source_schema pg_catalog.name
, source_table pg_catalog.name
, source_pk pg_catalog.jsonb
) returns pg_catalog.text as
$func$
declare
    _pk_change_check pg_catalog.text;
    _delete_statement pg_catalog.text;
    _pk_columns pg_catalog.text;
    _pk_values pg_catalog.text;
    _func_def pg_catalog.text;
    _relevant_columns_check pg_catalog.text;
    _truncate_statement pg_catalog.text;
begin
    -- Pre-calculate all the parts we need
    select pg_catalog.string_agg(pg_catalog.format('%I', x.attname), ', ' order by x.attnum)
    into strict _pk_columns
    from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name);

    select pg_catalog.string_agg(pg_catalog.format('new.%I', x.attname), ', ' order by x.attnum)
    into strict _pk_values
    from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name);

    if target_schema is not null and target_table is not null then
        -- Create delete statement for deleted rows
        _delete_statement := format('delete from %I.%I where %s', target_schema, target_table,
            (select string_agg(format('%I = old.%I', attname, attname), ' and ')
            from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name)));

        -- Create the primary key change check expression
        select string_agg(
            format('old.%I IS DISTINCT FROM new.%I', attname, attname),
            ' OR '
        )
        into strict _pk_change_check
        from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name);

        _truncate_statement := format('truncate table %I.%I; truncate table %I.%I',
                                target_schema, target_table, queue_schema, queue_table);
    end if;

    _relevant_columns_check := 
        pg_catalog.format('EXISTS (
            SELECT 1 FROM pg_catalog.jsonb_each(to_jsonb(old)) AS o(key, value)
            JOIN pg_catalog.jsonb_each(to_jsonb(new)) AS n(key, value) 
            ON o.key = n.key
            WHERE o.value IS DISTINCT FROM n.value
            AND o.key != ALL(
                SELECT config operator(pg_catalog.->) ''destination'' operator(pg_catalog.->>) ''embedding_column''
                FROM ai.vectorizer 
                WHERE source_table = %L AND source_schema = %L
                AND config operator(pg_catalog.->) ''destination'' operator(pg_catalog.->>) ''implementation'' operator(pg_catalog.=) ''column''
            )
        )', source_table, source_schema);

    if target_schema is not null and target_table is not null then
        _func_def := $def$
        begin
            if (TG_LEVEL = 'ROW') then
                if (TG_OP = 'DELETE') then
                    $DELETE_STATEMENT$;
                elsif (TG_OP = 'UPDATE') then
                    -- Check if the primary key has changed and queue the update
                    if $PK_CHANGE_CHECK$ then
                        $DELETE_STATEMENT$;
                        insert into $QUEUE_SCHEMA$.$QUEUE_TABLE$ ($PK_COLUMNS$)
                            values ($PK_VALUES$);
                    -- check if a relevant column has changed and queue the update
                    elsif $RELEVANT_COLUMNS_CHECK$ then
                        insert into $QUEUE_SCHEMA$.$QUEUE_TABLE$ ($PK_COLUMNS$)
                        values ($PK_VALUES$);
                    end if;

                    return new;
                else
                    insert into $QUEUE_SCHEMA$.$QUEUE_TABLE$ ($PK_COLUMNS$)
                    values ($PK_VALUES$);
                    return new;
                end if;

            elsif (TG_LEVEL = 'STATEMENT') then
                if (TG_OP = 'TRUNCATE') then
                    $TRUNCATE_STATEMENT$;
                end if;
                return null;
            end if;

            return null;
        end;
        $def$;

        -- Replace placeholders
        _func_def := replace(_func_def, '$DELETE_STATEMENT$', _delete_statement);
        _func_def := replace(_func_def, '$PK_CHANGE_CHECK$', _pk_change_check);
        _func_def := replace(_func_def, '$QUEUE_SCHEMA$', quote_ident(queue_schema));
        _func_def := replace(_func_def, '$QUEUE_TABLE$', quote_ident(queue_table));
        _func_def := replace(_func_def, '$PK_COLUMNS$', _pk_columns);
        _func_def := replace(_func_def, '$PK_VALUES$', _pk_values);
        _func_def := replace(_func_def, '$TARGET_SCHEMA$', quote_ident(target_schema));
        _func_def := replace(_func_def, '$TARGET_TABLE$', quote_ident(target_table));
        _func_def := replace(_func_def, '$RELEVANT_COLUMNS_CHECK$', _relevant_columns_check);
        _func_def := replace(_func_def, '$TRUNCATE_STATEMENT$', _truncate_statement);
    
    else
        _func_def := $def$
        begin
            if (TG_LEVEL = 'ROW') then
                if (TG_OP = 'UPDATE') then
                    if $RELEVANT_COLUMNS_CHECK$ then
                        insert into $QUEUE_SCHEMA$.$QUEUE_TABLE$ ($PK_COLUMNS$)
                        values ($PK_VALUES$);
                    end if;
                elseif (TG_OP = 'INSERT') then
                    insert into $QUEUE_SCHEMA$.$QUEUE_TABLE$ ($PK_COLUMNS$)
                    values ($PK_VALUES$);
                end if;
            end if;
            return null;
        end;
        $def$;
        _func_def := replace(_func_def, '$RELEVANT_COLUMNS_CHECK$', _relevant_columns_check);
        _func_def := replace(_func_def, '$QUEUE_SCHEMA$', quote_ident(queue_schema));
        _func_def := replace(_func_def, '$QUEUE_TABLE$', quote_ident(queue_table));
        _func_def := replace(_func_def, '$PK_COLUMNS$', _pk_columns);
        _func_def := replace(_func_def, '$PK_VALUES$', _pk_values);
    end if;
    return _func_def;
end;
$func$ language plpgsql immutable security invoker
set search_path to pg_catalog, pg_temp;

-------------------------------------------------------------------------------
-- _vectorizer_create_source_trigger
create or replace function ai._vectorizer_create_source_trigger
( trigger_name pg_catalog.name     -- Name for the trigger
, queue_schema pg_catalog.name     -- Schema containing the queue table
, queue_table pg_catalog.name      -- Table that will store queued items
, source_schema pg_catalog.name    -- Schema containing the watched table
, source_table pg_catalog.name     -- Table being watched for changes
, target_schema pg_catalog.name    -- Schema containing the target table for deletions
, target_table pg_catalog.name     -- Table where corresponding rows should be deleted
, source_pk pg_catalog.jsonb       -- JSON describing primary key columns to track
) returns void as
$func$
declare
    _sql pg_catalog.text;
begin
    
    execute format
    ( $sql$
    create function %I.%I() returns trigger 
    as $trigger_def$ 
    %s
    $trigger_def$ language plpgsql volatile parallel safe security definer 
    set search_path to pg_catalog, pg_temp
    $sql$
    , queue_schema
    , trigger_name
    , ai._vectorizer_build_trigger_definition(queue_schema,
                                              queue_table,
                                              target_schema,
                                              target_table,
                                              source_schema,
                                              source_table,
                                              source_pk)
    );

    -- Revoke public permissions
    _sql := pg_catalog.format(
        'revoke all on function %I.%I() from public',
        queue_schema, trigger_name
    );
    execute _sql;

    -- Create the row-level trigger
    select pg_catalog.format(
        $sql$
        create trigger %I
        after insert or update or delete
        on %I.%I
        for each row execute function %I.%I()
        $sql$,
        trigger_name,
        source_schema, source_table,
        queue_schema, trigger_name
    ) into strict _sql
    ;
    execute _sql;
    
    -- Create the statement-level trigger for TRUNCATE
    -- Note: Using the same trigger function but with a different event and level
    select pg_catalog.format(
        $sql$
        create trigger %I_truncate
        after truncate
        on %I.%I
        for each statement execute function %I.%I()
        $sql$,
        trigger_name,
        source_schema, source_table,
        queue_schema, trigger_name
    ) into strict _sql
    ;
    execute _sql;
end;
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-- This code block recreates all trigger functions for vectorizers to make sure
-- they have the most recent code for the function.
do $upgrade_block$
declare
    _vec record;
    _target_schema pg_catalog.name;
    _target_table pg_catalog.name;
    _destination_type pg_catalog.text;
begin
    -- Find all vectorizers
    for _vec in (
        select 
            v.id,
            v.source_schema,
            v.source_table,
            v.source_pk,
            v.trigger_name,
            v.queue_schema,
            v.queue_table,
            v.config
        from ai.vectorizer v
    )
    loop
        raise notice 'Recreating trigger function for vectorizer ID %s', _vec.id;
        
        _destination_type := _vec.config->'destination'->>'implementation';
        if _destination_type = 'table' then
            _target_schema := _vec.config->'destination'->>'target_schema';
            _target_table := _vec.config->'destination'->>'target_table';
        else -- destination column works with no target table in the trigger def
            _target_schema := null;
            _target_table := null;
        end if;

        execute format
        (
        --weird indent is intentional to make the sql functions look the same as during a fresh install
        --otherwise the snapshots will not match during upgrade testing.
            $sql$
    create or replace function %I.%I() returns trigger 
    as $trigger_def$ 
    %s 
    $trigger_def$ language plpgsql volatile parallel safe security definer 
    set search_path to pg_catalog, pg_temp
    $sql$
            , _vec.queue_schema, _vec.trigger_name,
            ai._vectorizer_build_trigger_definition(_vec.queue_schema,
                                                    _vec.queue_table,
                                                    _target_schema,
                                                    _target_table,
                                                    _vec.source_schema,
                                                    _vec.source_table,
                                                    _vec.source_pk)
        );
    end loop;
end;
$upgrade_block$;

-------------------------------------------------------------------------------
-- _vectorizer_vector_index_exists
create or replace function ai._vectorizer_vector_index_exists
( target_schema pg_catalog.name
, target_table pg_catalog.name
, indexing pg_catalog.jsonb
, column_name pg_catalog.name default 'embedding'
) returns pg_catalog.bool as
$func$
declare
    _implementation pg_catalog.text;
    _found pg_catalog.bool;
begin
    _implementation = pg_catalog.jsonb_extract_path_text(indexing, 'implementation');
    if _implementation not in ('diskann', 'hnsw') then
        raise exception 'unrecognized index implementation: %s', _implementation;
    end if;

    -- look for an index on the target table where the indexed column is the "embedding" column
    -- and the index is using the correct implementation
    select pg_catalog.count(*) filter
    ( where pg_catalog.pg_get_indexdef(i.indexrelid)
      ilike pg_catalog.concat('% using ', _implementation, ' %')
    ) > 0 into _found
    from pg_catalog.pg_class k
    inner join pg_catalog.pg_namespace n on (k.relnamespace operator(pg_catalog.=) n.oid)
    inner join pg_index i on (k.oid operator(pg_catalog.=) i.indrelid)
    inner join pg_catalog.pg_attribute a
        on (k.oid operator(pg_catalog.=) a.attrelid
        and a.attname operator(pg_catalog.=) column_name
        and a.attnum operator(pg_catalog.=) i.indkey[0]
        )
    where n.nspname operator(pg_catalog.=) target_schema
    and k.relname operator(pg_catalog.=) target_table
    ;
    return coalesce(_found, false);
end
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _vectorizer_should_create_vector_index
create or replace function ai._vectorizer_should_create_vector_index(vectorizer ai.vectorizer) returns boolean
as $func$
declare
    _indexing pg_catalog.jsonb;
    _implementation pg_catalog.text;
    _create_when_queue_empty pg_catalog.bool;
    _sql pg_catalog.text;
    _count pg_catalog.int8;
    _min_rows pg_catalog.int8;
    _schema_name pg_catalog.name;
    _table_name pg_catalog.name;
    _column_name pg_catalog.name;
begin
    -- grab the indexing config
    _indexing = pg_catalog.jsonb_extract_path(vectorizer.config, 'indexing');
    if _indexing is null then
        return false;
    end if;

    -- grab the indexing config's implementation
    _implementation = pg_catalog.jsonb_extract_path_text(_indexing, 'implementation');
    -- if implementation is missing or none, exit
    if _implementation is null or _implementation = 'none' then
        return false;
    end if;

    _schema_name = coalesce(vectorizer.config operator(pg_catalog.->) 'destination' operator(pg_catalog.->>) 'target_schema', vectorizer.source_schema);
    _table_name = coalesce(vectorizer.config operator(pg_catalog.->) 'destination' operator(pg_catalog.->>) 'target_table', vectorizer.source_table);
    _column_name = coalesce(vectorizer.config operator(pg_catalog.->) 'destination' operator(pg_catalog.->>) 'embedding_column', 'embedding');
    -- see if the index already exists. if so, exit
    if ai._vectorizer_vector_index_exists(_schema_name, _table_name, _indexing, _column_name) then
        return false;
    end if;

    -- if flag set, only attempt to create the vector index if the queue table is empty
    _create_when_queue_empty = coalesce(pg_catalog.jsonb_extract_path(_indexing, 'create_when_queue_empty')::pg_catalog.bool, true);
    if _create_when_queue_empty then
        -- count the rows in the queue table
        select pg_catalog.format
        ( $sql$select pg_catalog.count(1) from %I.%I limit 1$sql$
        , vectorizer.queue_schema
        , vectorizer.queue_table
        ) into strict _sql
        ;
        execute _sql into _count;
        if _count operator(pg_catalog.>) 0 then
            raise notice 'queue for %.% is not empty. skipping vector index creation', _schema_name, _table_name;
            return false;
        end if;
    end if;

    -- if min_rows has a value
    _min_rows = coalesce(pg_catalog.jsonb_extract_path_text(_indexing, 'min_rows')::pg_catalog.int8, 0);
    if _min_rows > 0 then
        -- count the rows in the target table
        select pg_catalog.format
        ( $sql$select pg_catalog.count(*) from (select 1 from %I.%I limit %L) x$sql$
        , _schema_name
        , _table_name
        , _min_rows
        ) into strict _sql
        ;
        execute _sql into _count;
    end if;

    -- if we have met or exceeded min_rows, create the index
    return coalesce(_count, 0) >= _min_rows;
end
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _vectorizer_create_vector_index
create or replace function ai._vectorizer_create_vector_index
( target_schema pg_catalog.name
, target_table pg_catalog.name
, indexing pg_catalog.jsonb
, column_name pg_catalog.name default 'embedding'
) returns void as
$func$
declare
    _key1 pg_catalog.int4 = 1982010642;
    _key2 pg_catalog.int4;
    _implementation pg_catalog.text;
    _with_count pg_catalog.int8;
    _with pg_catalog.text;
    _ext_schema pg_catalog.name;
    _sql pg_catalog.text;
begin

    -- use the target table's oid as the second key for the advisory lock
    select k.oid::pg_catalog.int4 into strict _key2
    from pg_catalog.pg_class k
    inner join pg_catalog.pg_namespace n on (k.relnamespace operator(pg_catalog.=) n.oid)
    where k.relname operator(pg_catalog.=) target_table
    and n.nspname operator(pg_catalog.=) target_schema
    ;

    -- try to grab a transaction-level advisory lock specific to the target table
    -- if we get it, no one else is building the vector index. proceed
    -- if we don't get it, someone else is already working on it. abort
    if not pg_catalog.pg_try_advisory_xact_lock(_key1, _key2) then
        raise warning 'another process is already building a vector index on %.%', target_schema, target_table;
        return;
    end if;

    -- double-check that the index doesn't exist now that we're holding the advisory lock
    -- nobody likes redundant indexes
    if ai._vectorizer_vector_index_exists(target_schema, target_table, indexing, column_name) then
        raise notice 'the vector index on %.% already exists', target_schema, target_table;
        return;
    end if;

    _implementation = pg_catalog.jsonb_extract_path_text(indexing, 'implementation');
    case _implementation
        when 'diskann' then
            select
              pg_catalog.count(*)
            , pg_catalog.string_agg
              ( case w.key
                  when 'storage_layout' then pg_catalog.format('%s=%L', w.key, w.value)
                  when 'max_alpha' then pg_catalog.format('%s=%s', w.key, w.value::pg_catalog.float8)
                  else pg_catalog.format('%s=%s', w.key, w.value::pg_catalog.int4)
                end
              , ', '
              )
            into strict
              _with_count
            , _with
            from pg_catalog.jsonb_each_text(indexing) w
            where w.key in
            ( 'storage_layout'
            , 'num_neighbors'
            , 'search_list_size'
            , 'max_alpha'
            , 'num_dimensions'
            , 'num_bits_per_dimension'
            )
            ;

            select pg_catalog.format
            ( $sql$create index on %I.%I using diskann (%I)%s$sql$
            , target_schema, target_table
            , column_name
            , case when _with_count operator(pg_catalog.>) 0
                then pg_catalog.format(' with (%s)', _with)
                else ''
              end
            ) into strict _sql;
            execute _sql;
        when 'hnsw' then
            select
              pg_catalog.count(*)
            , pg_catalog.string_agg(pg_catalog.format('%s=%s', w.key, w.value::pg_catalog.int4), ', ')
            into strict
              _with_count
            , _with
            from pg_catalog.jsonb_each_text(indexing) w
            where w.key in ('m', 'ef_construction')
            ;

            select n.nspname into strict _ext_schema
            from pg_catalog.pg_extension x
            inner join pg_catalog.pg_namespace n on (x.extnamespace operator(pg_catalog.=) n.oid)
            where x.extname operator(pg_catalog.=) 'vector'
            ;

            select pg_catalog.format
            ( $sql$create index on %I.%I using hnsw (%I %I.%s)%s$sql$
            , target_schema, target_table
            , column_name
            , _ext_schema
            , indexing operator(pg_catalog.->>) 'opclass'
            , case when _with_count operator(pg_catalog.>) 0
                then pg_catalog.format(' with (%s)', _with)
                else ''
              end
            ) into strict _sql;
            execute _sql;
        else
            raise exception 'unrecognized index implementation: %s', _implementation;
    end case;
end
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;


-------------------------------------------------------------------------------
-- _vectorizer_schedule_job
create or replace function ai._vectorizer_schedule_job
( vectorizer_id pg_catalog.int4
, scheduling pg_catalog.jsonb
) returns pg_catalog.int8 as
$func$
declare
    _implementation pg_catalog.text;
    _sql pg_catalog.text;
    _extension_schema pg_catalog.name;
    _job_id pg_catalog.int8;
    _ai_extension_exists pg_catalog.bool;
begin
    select pg_catalog.jsonb_extract_path_text(scheduling, 'implementation')
    into strict _implementation
    ;
    case
        when _implementation operator(pg_catalog.=) 'timescaledb' then
            select pg_catalog.count(*) > 0
            into strict _ai_extension_exists
            from pg_catalog.pg_extension x
            where x.extname operator(pg_catalog.=) 'ai';
            
            if not _ai_extension_exists then
                raise exception 'ai extension not found but it is needed for timescaledb scheduling.';
            end if;
            -- look up schema/name of the extension for scheduling. may be null
            select n.nspname into _extension_schema
            from pg_catalog.pg_extension x
            inner join pg_catalog.pg_namespace n on (x.extnamespace operator(pg_catalog.=) n.oid)
            where x.extname operator(pg_catalog.=) _implementation
            ;
            if _extension_schema is null then
                raise exception 'timescaledb extension not found';
            end if;
        when _implementation operator(pg_catalog.=) 'none' then
            return null;
        else
            raise exception 'scheduling implementation not recognized';
    end case;

    -- schedule the job using the implementation chosen
    case _implementation
        when 'timescaledb' then
            -- schedule the work proc with timescaledb background jobs
            select pg_catalog.format
            ( $$select %I.add_job('ai._vectorizer_job'::pg_catalog.regproc, %s, config=>%L)$$
            , _extension_schema
            , ( -- gather up the arguments
                select pg_catalog.string_agg
                ( pg_catalog.format('%s=>%L', s.key, s.value)
                , ', '
                order by x.ord
                )
                from pg_catalog.jsonb_each_text(scheduling) s
                inner join
                pg_catalog.unnest(array['schedule_interval', 'initial_start', 'fixed_schedule', 'timezone']) with ordinality x(key, ord)
                on (s.key = x.key)
              )
            , pg_catalog.jsonb_build_object('vectorizer_id', vectorizer_id)::pg_catalog.text
            ) into strict _sql
            ;
            execute _sql into strict _job_id;
    end case;
    return _job_id;
end
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _vectorizer_job
create or replace procedure ai._vectorizer_job
( job_id pg_catalog.int4 default null
, config pg_catalog.jsonb default null
) as
$func$
declare
    _vectorizer_id pg_catalog.int4;
    _vec ai.vectorizer%rowtype;
    _sql pg_catalog.text;
    _found pg_catalog.bool;
    _count pg_catalog.int8;
    _should_create_vector_index pg_catalog.bool;
begin
    set local search_path = pg_catalog, pg_temp;
    if config is null then
        raise exception 'config is null';
    end if;

    -- get the vectorizer id from the config
    select pg_catalog.jsonb_extract_path_text(config, 'vectorizer_id')::pg_catalog.int4
    into strict _vectorizer_id
    ;

    -- get the vectorizer
    select * into strict _vec
    from ai.vectorizer v
    where v.id operator(pg_catalog.=) _vectorizer_id
    ;

    commit;
    set local search_path = pg_catalog, pg_temp;

    _should_create_vector_index = ai._vectorizer_should_create_vector_index(_vec);

    -- if the conditions are right, create the vectorizer index
    if _should_create_vector_index and _vec.config operator(pg_catalog.->) 'destination' operator(pg_catalog.->>) 'implementation' operator(pg_catalog.=) 'table' then
        commit;
        set local search_path = pg_catalog, pg_temp;
        perform ai._vectorizer_create_vector_index
        (_vec.config operator(pg_catalog.->) 'destination' operator(pg_catalog.->>) 'target_schema'
        , _vec.config operator(pg_catalog.->) 'destination' operator(pg_catalog.->>) 'target_table'
        , pg_catalog.jsonb_extract_path(_vec.config, 'indexing')
        );
    elsif _should_create_vector_index and _vec.config operator(pg_catalog.->) 'destination' operator(pg_catalog.->>) 'implementation' operator(pg_catalog.=) 'column' then
        commit;
        set local search_path = pg_catalog, pg_temp;
        perform ai._vectorizer_create_vector_index
        (_vec.source_schema
        , _vec.source_table
        , pg_catalog.jsonb_extract_path(_vec.config, 'indexing')
        , _vec.config operator(pg_catalog.->) 'destination' operator(pg_catalog.->>) 'embedding_column'
        );
    end if;

    commit;
    set local search_path = pg_catalog, pg_temp;

    -- if there is at least one item in the queue, we need to execute the vectorizer
    select pg_catalog.format
    ( $sql$
    select true
    from %I.%I
    for update skip locked
    limit 1
    $sql$
    , _vec.queue_schema, _vec.queue_table
    ) into strict _sql
    ;
    execute _sql into _found;
    commit;
    set local search_path = pg_catalog, pg_temp;
    if coalesce(_found, false) is true then
        -- count total items in the queue
        select pg_catalog.format
        ( $sql$select pg_catalog.count(1) from (select 1 from %I.%I limit 501) $sql$
        , _vec.queue_schema, _vec.queue_table
        ) into strict _sql
        ;
        execute _sql into strict _count;
        commit;
        set local search_path = pg_catalog, pg_temp;
        -- for every 50 items in the queue, execute a vectorizer max out at 10 vectorizers
        _count = least(pg_catalog.ceil(_count::pg_catalog.float8 / 50.0::pg_catalog.float8), 10::pg_catalog.float8)::pg_catalog.int8;
        raise debug 'job_id %: executing % vectorizers...', job_id, _count;
        while _count > 0 loop
            -- execute the vectorizer
            perform ai.execute_vectorizer(_vectorizer_id);
            _count = _count - 1;
        end loop;
    end if;
    commit;
    set local search_path = pg_catalog, pg_temp;
end
$func$
language plpgsql security invoker
;

-------------------------------------------------------------------------------
-- execute_vectorizer by vectorizer name
create or replace function ai.execute_vectorizer(vectorizer_name pg_catalog.text) returns void
as $func$
declare
    _vectorizer_id pg_catalog.int4;
begin
    select v.id into strict _vectorizer_id
    from ai.vectorizer v
    where v.name operator(pg_catalog.=) vectorizer_name;

    -- execute the vectorizer
    perform ai.execute_vectorizer(_vectorizer_id);
end
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

--------------------------------------------------------------------------------
-- 012-vectorizer-api.sql
-------------------------------------------------------------------------------
-- create_vectorizer
create or replace function ai.create_vectorizer
( source pg_catalog.regclass
, name pg_catalog.text default null
, destination pg_catalog.jsonb default ai.destination_table()
, loading pg_catalog.jsonb default null
, parsing pg_catalog.jsonb default ai.parsing_auto()
, embedding pg_catalog.jsonb default null
, chunking pg_catalog.jsonb default ai.chunking_recursive_character_text_splitter()
, indexing pg_catalog.jsonb default ai.indexing_default()
, formatting pg_catalog.jsonb default ai.formatting_python_template()
, scheduling pg_catalog.jsonb default ai.scheduling_default()
, processing pg_catalog.jsonb default ai.processing_default()
, queue_schema pg_catalog.name default null
, queue_table pg_catalog.name default null
, grant_to pg_catalog.name[] default ai.grant_to()
, enqueue_existing pg_catalog.bool default true
, if_not_exists pg_catalog.bool default false
) returns pg_catalog.int4
as $func$
declare
    _missing_roles pg_catalog.name[];
    _source_table pg_catalog.name;
    _source_schema pg_catalog.name;
    _trigger_name pg_catalog.name;
    _is_owner pg_catalog.bool;
    _dimensions pg_catalog.int4;
    _source_pk pg_catalog.jsonb;
    _vectorizer_id pg_catalog.int4;
    _existing_vectorizer_id pg_catalog.int4;
    _sql pg_catalog.text;
    _job_id pg_catalog.int8;
    _queue_failed_table pg_catalog.name;
begin
    -- make sure all the roles listed in grant_to exist
    if grant_to is not null then
        select
          pg_catalog.array_agg(r) filter (where r operator(pg_catalog.!=) 'public' and pg_catalog.to_regrole(r) is null) -- missing
        , pg_catalog.array_agg(r) filter (where r operator(pg_catalog.=) 'public' or pg_catalog.to_regrole(r) is not null) -- real roles
        into strict
          _missing_roles
        , grant_to
        from pg_catalog.unnest(grant_to) r
        ;
        if pg_catalog.array_length(_missing_roles, 1) operator(pg_catalog.>) 0 then
            raise warning 'one or more grant_to roles do not exist: %', _missing_roles;
        end if;
    end if;

    if embedding is null then
        raise exception 'embedding configuration is required';
    end if;

    if loading is null then
        raise exception 'loading configuration is required';
    end if;

    -- get source table name and schema name
    select
      k.relname
    , n.nspname
    , pg_catalog.pg_has_role(pg_catalog.current_user(), k.relowner, 'MEMBER')
    into strict _source_table, _source_schema, _is_owner
    from pg_catalog.pg_class k
    inner join pg_catalog.pg_namespace n on (k.relnamespace operator(pg_catalog.=) n.oid)
    where k.oid operator(pg_catalog.=) source
    ;
    -- not an owner of the table, but superuser?
    if not _is_owner then
        select r.rolsuper into strict _is_owner
        from pg_catalog.pg_roles r
        where r.rolname operator(pg_catalog.=) pg_catalog.current_user()
        ;
    end if;

    if not _is_owner then
        raise exception 'only a superuser or the owner of the source table may create a vectorizer on it';
    end if;

    select (embedding operator(pg_catalog.->) 'dimensions')::pg_catalog.int4 into _dimensions;
    if _dimensions is null then
        raise exception 'dimensions argument is required';
    end if;

    -- get the source table's primary key definition
    select ai._vectorizer_source_pk(source) into strict _source_pk;
    if _source_pk is null or pg_catalog.jsonb_array_length(_source_pk) operator(pg_catalog.=) 0 then
        raise exception 'source table must have a primary key constraint';
    end if;

    _vectorizer_id = pg_catalog.nextval('ai.vectorizer_id_seq'::pg_catalog.regclass);
    _trigger_name = pg_catalog.concat('_vectorizer_src_trg_', _vectorizer_id);
    queue_schema = coalesce(queue_schema, 'ai');
    queue_table = coalesce(queue_table, pg_catalog.concat('_vectorizer_q_', _vectorizer_id));
    _queue_failed_table = pg_catalog.concat('_vectorizer_q_failed_', _vectorizer_id);

    -- make sure queue table name is available
    if pg_catalog.to_regclass(pg_catalog.format('%I.%I', queue_schema, queue_table)) is not null then
        raise exception 'an object named %.% already exists. specify an alternate queue_table explicitly', queue_schema, queue_table
        using errcode = 'duplicate_object';
    end if;

    -- validate the loading config
    perform ai._validate_loading(loading, _source_schema, _source_table);

    -- validate the parsing config
    perform ai._validate_parsing(
        parsing,
        loading,
        _source_schema,
        _source_table
    );

    -- validate the destination config
    perform ai._validate_destination(destination, chunking);

    -- validate the embedding config
    perform ai._validate_embedding(embedding);

    -- validate the chunking config
    perform ai._validate_chunking(chunking);

    -- if ai.indexing_default, resolve the default
    if indexing operator(pg_catalog.->>) 'implementation' = 'default' then
        indexing = ai._resolve_indexing_default();
    end if;

    -- validate the indexing config
    perform ai._validate_indexing(indexing);

    -- validate the formatting config
    perform ai._validate_formatting(formatting, _source_schema, _source_table);

    -- if ai.scheduling_default, resolve the default
    if scheduling operator(pg_catalog.->>) 'implementation' = 'default' then
        scheduling = ai._resolve_scheduling_default();
    end if;

    -- validate the scheduling config
    perform ai._validate_scheduling(scheduling);

    -- validate the processing config
    perform ai._validate_processing(processing);

    -- if scheduling is none then indexing must also be none
    if scheduling operator(pg_catalog.->>) 'implementation' = 'none'
    and indexing operator(pg_catalog.->>) 'implementation' != 'none' then
        raise exception 'automatic indexing is not supported without scheduling. set indexing=>ai.indexing_none() when scheduling=>ai.scheduling_none()';
    end if;

    -- evaluate the destination config
    destination = ai._evaluate_destination(destination, _source_schema, _source_table);

    if name is null then
        if destination operator(pg_catalog.->>) 'implementation' = 'table' then
            name = pg_catalog.format('%s_%s', destination operator(pg_catalog.->>) 'target_schema', destination operator(pg_catalog.->>) 'target_table');
        elseif destination operator(pg_catalog.->>) 'implementation' = 'column' then
            name = pg_catalog.format('%s_%s_%s', _source_schema, _source_table, destination operator(pg_catalog.->>) 'embedding_column');
        end if;
    end if;

    -- validate the name is available
    select id from ai.vectorizer
    where ai.vectorizer.name operator(pg_catalog.=) create_vectorizer.name
    into _existing_vectorizer_id
    ;
    if _existing_vectorizer_id is not null then
        if if_not_exists is false then
            raise exception 'a vectorizer named % already exists.', name
            using errcode = 'duplicate_object';
        end if;
        raise notice 'a vectorizer named % already exists, skipping', name;
        return _existing_vectorizer_id;
    end if;

    -- validate the destination can create objects after the if_not_exists check
    perform ai._validate_destination_can_create_objects(destination);

    -- grant select to source table
    perform ai._vectorizer_grant_to_source
    ( _source_schema
    , _source_table
    , grant_to
    );

    -- create the target table or column
    if destination operator(pg_catalog.->>) 'implementation' = 'table' then
        perform ai._vectorizer_create_destination_table
        ( _source_schema
        , _source_table
        , _source_pk
        , _dimensions
        , destination
        , grant_to
        );
    elseif destination operator(pg_catalog.->>) 'implementation' = 'column' then
        perform ai._vectorizer_create_destination_column
        ( _source_schema
        , _source_table
        , _dimensions
        , destination
        );
    else
        raise exception 'invalid implementation for destination';
    end if;

    -- create queue table
    perform ai._vectorizer_create_queue_table
    ( queue_schema
    , queue_table
    , _source_pk
    , grant_to
    );

    -- create queue failed table
    perform ai._vectorizer_create_queue_failed_table
    ( queue_schema
    , _queue_failed_table
    , _source_pk
    , grant_to
    );

    -- create trigger on source table to populate queue
    perform ai._vectorizer_create_source_trigger
    ( _trigger_name
    , queue_schema
    , queue_table
    , _source_schema
    , _source_table
    , destination operator(pg_catalog.->>) 'target_schema'
    , destination operator(pg_catalog.->>) 'target_table'
    , _source_pk
    );


    -- schedule the async ext job
    select ai._vectorizer_schedule_job
    (_vectorizer_id
    , scheduling
    ) into _job_id
    ;
    if _job_id is not null then
        scheduling = pg_catalog.jsonb_insert(scheduling, array['job_id'], pg_catalog.to_jsonb(_job_id));
    end if;

    insert into ai.vectorizer
    ( id
    , source_schema
    , source_table
    , source_pk
    , trigger_name
    , queue_schema
    , queue_table
    , queue_failed_table
    , config
    , name
    )
    values
    ( _vectorizer_id
    , _source_schema
    , _source_table
    , _source_pk
    , _trigger_name
    , queue_schema
    , queue_table
    , _queue_failed_table
    , pg_catalog.jsonb_build_object
      ( 'version', '__version__'
      , 'loading', loading
      , 'parsing', parsing
      , 'embedding', embedding
      , 'chunking', chunking
      , 'indexing', indexing
      , 'formatting', formatting
      , 'scheduling', scheduling
      , 'processing', processing
      , 'destination', destination
      )
    , create_vectorizer.name
    );

    -- grant select on the vectorizer table
    perform ai._vectorizer_grant_to_vectorizer(grant_to);

    -- insert into queue any existing rows from source table
    if enqueue_existing is true then
        select pg_catalog.format
        ( $sql$
        insert into %I.%I (%s)
        select %s
        from %I.%I x
        ;
        $sql$
        , queue_schema, queue_table
        , (
            select pg_catalog.string_agg(pg_catalog.format('%I', x.attname), ', ' order by x.attnum)
            from pg_catalog.jsonb_to_recordset(_source_pk) x(attnum int, attname name)
          )
        , (
            select pg_catalog.string_agg(pg_catalog.format('x.%I', x.attname), ', ' order by x.attnum)
            from pg_catalog.jsonb_to_recordset(_source_pk) x(attnum int, attname name)
          )
        , _source_schema, _source_table
        ) into strict _sql
        ;
        execute _sql;
    end if;
    return _vectorizer_id;
end
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- disable_vectorizer_schedule
create or replace function ai.disable_vectorizer_schedule(vectorizer_id pg_catalog.int4) returns void
as $func$
declare
    _vec ai.vectorizer%rowtype;
    _schedule pg_catalog.jsonb;
    _job_id pg_catalog.int8;
    _sql pg_catalog.text;
begin
    update ai.vectorizer v
    set disabled = true
    where v.id operator(pg_catalog.=) vectorizer_id
    returning * into strict _vec
    ;

    -- enable the scheduled job if exists
    _schedule = _vec.config operator(pg_catalog.->) 'scheduling';
    if _schedule is not null then
        case _schedule operator(pg_catalog.->>) 'implementation'
            when 'none' then -- ok
            when 'timescaledb' then
                _job_id = (_schedule operator(pg_catalog.->) 'job_id')::pg_catalog.int8;
                select pg_catalog.format
                ( $$select %I.alter_job(job_id, scheduled=>false) from timescaledb_information.jobs where job_id = %L$$
                , n.nspname
                , _job_id
                ) into _sql
                from pg_catalog.pg_extension x
                inner join pg_catalog.pg_namespace n on (x.extnamespace = n.oid)
                where x.extname = 'timescaledb'
                ;
                if _sql is not null then
                    execute _sql;
                end if;
        end case;
    end if;
end;
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

create or replace function ai.disable_vectorizer_schedule(name pg_catalog.text) returns void
as $func$
   select ai.disable_vectorizer_schedule(v.id)
   from ai.vectorizer v
   where v.name operator(pg_catalog.=) disable_vectorizer_schedule.name;
$func$ language sql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- enable_vectorizer_schedule
create or replace function ai.enable_vectorizer_schedule(vectorizer_id pg_catalog.int4) returns void
as $func$
declare
    _vec ai.vectorizer%rowtype;
    _schedule pg_catalog.jsonb;
    _job_id pg_catalog.int8;
    _sql pg_catalog.text;
begin
    update ai.vectorizer v
    set disabled = false
    where v.id operator(pg_catalog.=) vectorizer_id
    returning * into strict _vec
    ;

    -- enable the scheduled job if exists
    _schedule = _vec.config operator(pg_catalog.->) 'scheduling';
    if _schedule is not null then
        case _schedule operator(pg_catalog.->>) 'implementation'
            when 'none' then -- ok
            when 'timescaledb' then
                _job_id = (_schedule operator(pg_catalog.->) 'job_id')::pg_catalog.int8;
                select pg_catalog.format
                ( $$select %I.alter_job(job_id, scheduled=>true) from timescaledb_information.jobs where job_id = %L$$
                , n.nspname
                , _job_id
                ) into _sql
                from pg_catalog.pg_extension x
                inner join pg_catalog.pg_namespace n on (x.extnamespace operator(pg_catalog.=) n.oid)
                where x.extname operator(pg_catalog.=) 'timescaledb'
                ;
                if _sql is not null then
                    execute _sql;
                end if;
        end case;
    end if;
end;
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

create or replace function ai.enable_vectorizer_schedule(name pg_catalog.text) returns void
as $func$
   select ai.enable_vectorizer_schedule(v.id)
   from ai.vectorizer v
   where v.name operator(pg_catalog.=) enable_vectorizer_schedule.name;
$func$ language sql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- drop_vectorizer
create or replace function ai.drop_vectorizer
( vectorizer_id pg_catalog.int4
, drop_all pg_catalog.bool default false
) returns void
as $func$
/* drop_vectorizer
This function does the following:
1. deletes the scheduled job if any
2. drops the trigger from the source table
3. drops the trigger function
4. drops the queue table
5. deletes the vectorizer row

UNLESS drop_all = true, it does NOT:
1. drop the target table containing the embeddings
2. drop the view joining the target and source
*/
declare
    _vec ai.vectorizer%rowtype;
    _schedule pg_catalog.jsonb;
    _job_id pg_catalog.int8;
    _trigger pg_catalog.pg_trigger%rowtype;
    _sql pg_catalog.text;
begin
    -- grab the vectorizer we need to drop
    select v.* into strict _vec
    from ai.vectorizer v
    where v.id operator(pg_catalog.=) vectorizer_id
    ;

    -- delete the scheduled job if exists
    _schedule = _vec.config operator(pg_catalog.->) 'scheduling';
    if _schedule is not null then
        case _schedule operator(pg_catalog.->>) 'implementation'
            when 'none' then -- ok
            when 'timescaledb' then
                _job_id = (_schedule operator(pg_catalog.->) 'job_id')::pg_catalog.int8;
                select pg_catalog.format
                ( $$select %I.delete_job(job_id) from timescaledb_information.jobs where job_id = %L$$
                , n.nspname
                , _job_id
                ) into _sql
                from pg_catalog.pg_extension x
                inner join pg_catalog.pg_namespace n on (x.extnamespace operator(pg_catalog.=) n.oid)
                where x.extname operator(pg_catalog.=) 'timescaledb'
                ;
                if found then
                    execute _sql;
                end if;
        end case;
    end if;

    -- try to look up the trigger so we can find the function/procedure backing the trigger
    select * into _trigger
    from pg_catalog.pg_trigger g
    inner join pg_catalog.pg_class k
    on (g.tgrelid operator(pg_catalog.=) k.oid
    and k.relname operator(pg_catalog.=) _vec.source_table)
    inner join pg_catalog.pg_namespace n
    on (k.relnamespace operator(pg_catalog.=) n.oid
    and n.nspname operator(pg_catalog.=) _vec.source_schema)
    where g.tgname operator(pg_catalog.=) _vec.trigger_name
    ;

    -- drop the trigger on the source table
    if found then
        select pg_catalog.format
        ( $sql$drop trigger %I on %I.%I$sql$
        , _trigger.tgname
        , _vec.source_schema
        , _vec.source_table
        ) into strict _sql
        ;
        execute _sql;

        select pg_catalog.format
        ( $sql$drop trigger if exists %I on %I.%I$sql$
        , format('%s_truncate', _trigger.tgname)
        , _vec.source_schema
        , _vec.source_table
        ) into _sql;
        execute _sql;

        -- drop the function/procedure backing the trigger
        select pg_catalog.format
        ( $sql$drop %s %I.%I()$sql$
        , case p.prokind when 'f' then 'function' when 'p' then 'procedure' end
        , n.nspname
        , p.proname
        ) into _sql
        from pg_catalog.pg_proc p
        inner join pg_catalog.pg_namespace n on (n.oid operator(pg_catalog.=) p.pronamespace)
        where p.oid operator(pg_catalog.=) _trigger.tgfoid
        ;
        if found then
            execute _sql;
        end if;
    else
        -- the trigger is missing. try to find the backing function by name and return type
        select pg_catalog.format
        ( $sql$drop %s %I.%I() cascade$sql$ -- cascade in case the trigger still exists somehow
        , case p.prokind when 'f' then 'function' when 'p' then 'procedure' end
        , n.nspname
        , p.proname
        ) into _sql
        from pg_catalog.pg_proc p
        inner join pg_catalog.pg_namespace n on (n.oid operator(pg_catalog.=) p.pronamespace)
        inner join pg_catalog.pg_type y on (p.prorettype operator(pg_catalog.=) y.oid)
        where n.nspname operator(pg_catalog.=) _vec.queue_schema
        and p.proname operator(pg_catalog.=) _vec.trigger_name
        and y.typname operator(pg_catalog.=) 'trigger'
        ;
        if found then
            execute _sql;
        end if;
    end if;

    -- drop the queue table if exists
    select pg_catalog.format
    ( $sql$drop table if exists %I.%I$sql$
    , _vec.queue_schema
    , _vec.queue_table
    ) into strict _sql;
    execute _sql;

    -- drop the failed queue table if exists
    select pg_catalog.format
    ( $sql$drop table if exists %I.%I$sql$
    , _vec.queue_schema
    , _vec.queue_failed_table
    ) into strict _sql;
    execute _sql;

    if drop_all and _vec.config operator(pg_catalog.->) 'destination' operator(pg_catalog.->>) 'implementation' operator(pg_catalog.=) 'table' then
        -- drop the view if exists
        select pg_catalog.format
        ( $sql$drop view if exists %I.%I$sql$
        , _vec.config operator(pg_catalog.->) 'destination' operator(pg_catalog.->>) 'view_schema'
        , _vec.config operator(pg_catalog.->) 'destination' operator(pg_catalog.->>) 'view_name'
        ) into strict _sql;
        execute _sql;

        -- drop the target table if exists
        select pg_catalog.format
        ( $sql$drop table if exists %I.%I$sql$
        , _vec.config operator(pg_catalog.->) 'destination' operator(pg_catalog.->>) 'target_schema'
        , _vec.config operator(pg_catalog.->) 'destination' operator(pg_catalog.->>) 'target_table'
        ) into strict _sql;
        execute _sql;
    end if;

    -- delete the vectorizer row
    delete from ai.vectorizer v
    where v.id operator(pg_catalog.=) vectorizer_id
    ;
end;
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

create or replace function ai.drop_vectorizer(name pg_catalog.text, drop_all pg_catalog.bool default false) returns void
as $func$
   select ai.drop_vectorizer(v.id, drop_all)
   from ai.vectorizer v
   where v.name operator(pg_catalog.=) drop_vectorizer.name;
$func$ language sql volatile security invoker
set search_path to pg_catalog, pg_temp;

-------------------------------------------------------------------------------
-- vectorizer_queue_pending
create or replace function ai.vectorizer_queue_pending
( vectorizer_id pg_catalog.int4
, exact_count pg_catalog.bool default false
) returns pg_catalog.int8
as $func$
declare
    _queue_schema pg_catalog.name;
    _queue_table pg_catalog.name;
    _sql pg_catalog.text;
    _queue_depth pg_catalog.int8;
begin
    select v.queue_schema, v.queue_table into _queue_schema, _queue_table
    from ai.vectorizer v
    where v.id operator(pg_catalog.=) vectorizer_id
    ;

    if _queue_schema is null or _queue_table is null then
        raise exception 'vectorizer has no queue table';
    end if;

    if exact_count then
        select format
        ( $sql$select count(1) from %I.%I$sql$
        , _queue_schema, _queue_table
        ) into strict _sql
        ;
        execute _sql into strict _queue_depth;
    else
        select format
        ( $sql$select count(*) from (select 1 from %I.%I limit 10001) as subselect$sql$
        , _queue_schema, _queue_table
        ) into strict _sql
        ;
        execute _sql into strict _queue_depth;
        if _queue_depth operator(pg_catalog.=) 10001 then
            _queue_depth = 9223372036854775807; -- max bigint value
        end if;
    end if;

    return _queue_depth;
end;
$func$ language plpgsql stable security invoker
set search_path to pg_catalog, pg_temp
;

create or replace function ai.vectorizer_queue_pending
( name pg_catalog.text
, exact_count pg_catalog.bool default false
) returns pg_catalog.int8
as $func$
   select ai.vectorizer_queue_pending(v.id, exact_count)
   from ai.vectorizer v
   where v.name operator(pg_catalog.=) vectorizer_queue_pending.name;
$func$ language sql stable security invoker
set search_path to pg_catalog, pg_temp;

-------------------------------------------------------------------------------
-- vectorizer_status
create or replace view ai.vectorizer_status as
select
  v.id,
  v.name
, pg_catalog.format('%I.%I', v.source_schema, v.source_table) as source_table
, case when v.config operator(pg_catalog.->) 'destination' operator(pg_catalog.->>) 'implementation' = 'table' then
    pg_catalog.format('%I.%I', v.config operator(pg_catalog.->) 'destination' operator(pg_catalog.->>) 'target_schema', v.config operator(pg_catalog.->) 'destination' operator(pg_catalog.->>) 'target_table')
    else null
    end as target_table
, case when v.config operator(pg_catalog.->) 'destination' operator(pg_catalog.->>) 'implementation' = 'table' then
    pg_catalog.format('%I.%I', v.config operator(pg_catalog.->) 'destination' operator(pg_catalog.->>) 'view_schema', v.config operator(pg_catalog.->) 'destination' operator(pg_catalog.->>) 'view_name')
    else null
    end as "view"
, case when v.config operator(pg_catalog.->) 'destination' operator(pg_catalog.->>) 'implementation' = 'column' then
    pg_catalog.format('%I', v.config operator(pg_catalog.->) 'destination' operator(pg_catalog.->>) 'embedding_column')
    else 'embedding'
    end as embedding_column
, case when v.queue_table is not null and
    pg_catalog.has_table_privilege
    ( current_user
    , pg_catalog.format('%I.%I', v.queue_schema, v.queue_table)
    , 'select'
    )
    then ai.vectorizer_queue_pending(v.id)
  else null
  end as pending_items
, disabled
from ai.vectorizer v
;

-------------------------------------------------------------------------------
-- vectorizer_embed
create or replace function ai.vectorizer_embed
( embedding_config pg_catalog.jsonb
, input_text pg_catalog.text
, input_type pg_catalog.text default null
) returns @extschema:vector@.vector
as $func$
declare
    _emb @extschema:vector@.vector;
begin
    case embedding_config operator(pg_catalog.->>) 'implementation'
        when 'openai' then
            _emb = ai.openai_embed
            ( embedding_config operator(pg_catalog.->>) 'model'
            , input_text
            , api_key_name=>(embedding_config operator(pg_catalog.->>) 'api_key_name')
            , dimensions=>(embedding_config operator(pg_catalog.->>) 'dimensions')::pg_catalog.int4
            , openai_user=>(embedding_config operator(pg_catalog.->>) 'user')
            );
        when 'ollama' then
            _emb = ai.ollama_embed
            ( embedding_config operator(pg_catalog.->>) 'model'
            , input_text
            , host=>(embedding_config operator(pg_catalog.->>) 'base_url')
            , keep_alive=>(embedding_config operator(pg_catalog.->>) 'keep_alive')
            , embedding_options=>(embedding_config operator(pg_catalog.->) 'options')
            );
        when 'voyageai' then
            _emb = ai.voyageai_embed
            ( embedding_config operator(pg_catalog.->>) 'model'
            , input_text
            , input_type=>coalesce(input_type, 'query')
            , api_key_name=>(embedding_config operator(pg_catalog.->>) 'api_key_name')
            );
        else
            raise exception 'unsupported embedding implementation';
    end case;

    return _emb;
end
$func$ language plpgsql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- vectorizer_embed
create or replace function ai.vectorizer_embed
( vectorizer_id pg_catalog.int4
, input_text pg_catalog.text
, input_type pg_catalog.text default null
) returns @extschema:vector@.vector
as $func$
    select ai.vectorizer_embed
    ( v.config operator(pg_catalog.->) 'embedding'
    , input_text
    , input_type
    )
    from ai.vectorizer v
    where v.id operator(pg_catalog.=) vectorizer_id
    ;
$func$ language sql stable security invoker
set search_path to pg_catalog, pg_temp
;

-- vectorizer_embed
create or replace function ai.vectorizer_embed
( name pg_catalog.text
, input_text pg_catalog.text
, input_type pg_catalog.text default null
) returns @extschema:vector@.vector
as $func$
    select ai.vectorizer_embed(v.id, input_text, input_type)
    from ai.vectorizer v
    where v.name operator(pg_catalog.=) vectorizer_embed.name
    ;
$func$ language sql stable security invoker
set search_path to pg_catalog, pg_temp;


-------------------------------------------------------------------------------
-- set_scheduling_timescaledb
create or replace function ai.set_scheduling
( vectorizer_id pg_catalog.int4
, scheduling pg_catalog.jsonb default ai.scheduling_default()
, indexing pg_catalog.jsonb default ai.indexing_default()
) returns pg_catalog.jsonb
as $func$
declare
  _job_id pg_catalog.int8;
  _updated_config pg_catalog.jsonb;
begin
    -- if ai.indexing_default, resolve the default
    if indexing operator(pg_catalog.->>) 'implementation' = 'default' then
        indexing = ai._resolve_indexing_default();
    end if;

    -- validate the indexing config
    perform ai._validate_indexing(indexing);

    -- if ai.scheduling_default, resolve the default
    if scheduling operator(pg_catalog.->>) 'implementation' = 'default' then
        scheduling = ai._resolve_scheduling_default();
    end if;

    -- validate the scheduling config
    perform ai._validate_scheduling(scheduling);

    -- if scheduling is none then indexing must also be none
    if scheduling operator(pg_catalog.->>) 'implementation' = 'none'
    and indexing operator(pg_catalog.->>) 'implementation' != 'none' then
        raise exception 'automatic indexing is not supported without scheduling. set indexing=>ai.indexing_none() when scheduling=>ai.scheduling_none()';
    end if;

    -- delete current job if it exists
    PERFORM public.delete_job(job_id::pg_catalog.int4)
    FROM (
        SELECT config #>> '{scheduling,job_id}' as job_id
        FROM ai.vectorizer
        WHERE id = vectorizer_id
    ) c
    WHERE job_id IS NOT NULL;

    -- schedule the async ext job
    select ai._vectorizer_schedule_job
    ( vectorizer_id
    , scheduling
    ) into _job_id
    ;
    if _job_id is not null then
        scheduling = pg_catalog.jsonb_insert(scheduling, array['job_id'], pg_catalog.to_jsonb(_job_id));
    end if;

    UPDATE ai.vectorizer
    SET config = config operator(pg_catalog.||) pg_catalog.jsonb_build_object
    ( 'scheduling'
    , scheduling
    , 'indexing'
    , indexing
    )
    WHERE id = vectorizer_id
    RETURNING config INTO _updated_config;

    RETURN _updated_config;
end
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;


--------------------------------------------------------------------------------
-- 013-worker-tracking.sql
CREATE OR REPLACE FUNCTION ai._worker_start(version text, expected_heartbeat_interval interval) RETURNS uuid AS $$
DECLARE
    worker_id uuid;
BEGIN
    --can add version check here
    INSERT INTO ai.vectorizer_worker_process (version, expected_heartbeat_interval) VALUES (version, expected_heartbeat_interval) RETURNING id INTO worker_id;
    RETURN worker_id;
END;
$$ LANGUAGE plpgsql security invoker
set search_path to pg_catalog, pg_temp;

CREATE OR REPLACE FUNCTION ai._worker_heartbeat(worker_id uuid, num_successes_since_last_heartbeat int, num_errors_since_last_heartbeat int, error_message text) RETURNS void AS $$
DECLARE
    heartbeat_timestamp timestamptz = clock_timestamp();
BEGIN
    UPDATE ai.vectorizer_worker_process SET 
          last_heartbeat = heartbeat_timestamp 
        , heartbeat_count = heartbeat_count + 1 
        , error_count = error_count + num_errors_since_last_heartbeat
        , success_count = success_count + num_successes_since_last_heartbeat
        , last_error_message = CASE WHEN error_message IS NOT NULL THEN error_message ELSE last_error_message END 
        , last_error_at = CASE WHEN error_message IS NOT NULL THEN heartbeat_timestamp ELSE last_error_at END 
    WHERE id = worker_id;
END;
$$ LANGUAGE plpgsql security invoker
set search_path to pg_catalog, pg_temp;

CREATE OR REPLACE FUNCTION ai._worker_progress(worker_id uuid, worker_vectorizer_id int, num_successes int, error_message text) RETURNS void AS $$
DECLARE
    progress_timestamp timestamptz = clock_timestamp();
BEGIN
    IF NOT EXISTS (SELECT 1 FROM ai.vectorizer_worker_progress WHERE vectorizer_id = worker_vectorizer_id) THEN
        --make sure a row exists for this vectorizer
        INSERT INTO ai.vectorizer_worker_progress (vectorizer_id) VALUES (worker_vectorizer_id) ON CONFLICT DO NOTHING;
    END IF;

    UPDATE ai.vectorizer_worker_progress SET 
        last_success_at = CASE WHEN error_message IS NULL THEN progress_timestamp ELSE last_success_at END
      , last_success_process_id = CASE WHEN error_message IS NULL THEN worker_id ELSE last_success_process_id END
      , last_error_at = CASE WHEN error_message IS NULL THEN last_error_at ELSE progress_timestamp END
      , last_error_message = CASE WHEN error_message IS NULL THEN last_error_message ELSE error_message END
      , last_error_process_id = CASE WHEN error_message IS NULL THEN last_error_process_id ELSE worker_id END
      , success_count = success_count + num_successes
      , error_count = error_count + CASE WHEN error_message IS NULL THEN 0 ELSE 1 END
    WHERE vectorizer_id = worker_vectorizer_id;
END;
$$ LANGUAGE plpgsql security invoker
set search_path to pg_catalog, pg_temp;

--------------------------------------------------------------------------------
-- 014-semantic-catalog.sql

-------------------------------------------------------------------------------
-- embedding_sentence_transformers
create or replace function ai.embedding_sentence_transformers
( model text default 'nomic-ai/nomic-embed-text-v1.5'
, dimensions int4 default 768
) returns jsonb
as $func$
    select json_build_object
    ( 'implementation', 'sentence_transformers'
    , 'config_type', 'embedding'
    , 'model', model
    , 'dimensions', dimensions
    )
$func$ language sql immutable security invoker
set search_path to pg_catalog, pg_temp
;

-- TODO: function to validate embedding_sentence_transformers config

-------------------------------------------------------------------------------
-- _semantic_catalog_make_trigger
create or replace function ai._semantic_catalog_make_triggers(semantic_catalog_id int4) returns void
as $func$
/*
this function dynamically creates triggers on the obj, sql, and fact tables associated with a
semantic catalog. if any non-vector columns are updated, the vector columns are nulled out by
these triggers. this serves as the signal that the row should be reembedded
*/
declare
    _tbl text;
    _sql text;
    _vec_type oid;
    _vec_nulls text;
    _col_diffs text;
begin
    -- find the oid of the vector data type
    select y.oid into strict _vec_type
    from pg_type y
    inner join pg_depend d on (y.oid = d.objid)
    inner join pg_extension x on (x.oid = d.refobjid)
    where d.classid = 'pg_catalog.pg_type'::regclass::oid
    and d.refclassid = 'pg_catalog.pg_extension'::regclass::oid
    and d.deptype = 'e'
    and x.extname = 'vector'
    and y.typname = 'vector'
    ;

    foreach _tbl in array array['obj', 'sql', 'fact']
    loop
        select string_agg
        (
          format
          ( $sql$new.%s = null;$sql$
          , a.attname
          )
        , E'\n        '
        order by a.attnum
        ) filter (where a.atttypid = _vec_type)
        , string_agg
        (
          format
          ( $sql$(old.%s != new.%s)$sql$
          , a.attname
          , a.attname
          )
        , E'\n    or '
        order by a.attnum
        ) filter (where a.atttypid != _vec_type)
        into strict 
          _vec_nulls
        , _col_diffs
        from pg_class k
        inner join pg_namespace n on (k.relnamespace = n.oid)
        inner join pg_attribute a on (k.oid = a.attrelid)
        where n.nspname = 'ai'
        and k.relname = format('semantic_catalog_%s_%s', _tbl, semantic_catalog_id)
        and a.attnum > 0
        and not a.attisdropped
        ;
        
        _sql = format(regexp_replace(
        $sql$
        create or replace function ai.semantic_catalog_%s_%s_trig() returns trigger
        as $trigger$
        declare
        begin
            if tg_op = 'UPDATE' and
            (  %s
            )
            then
                %s
            end if;
            return new;
        end
        $trigger$ language plpgsql volatile security invoker
        set search_path to pg_catalog, pg_temp
        $sql$, '^ {8}', '', 'gm') -- dedent 8 spaces
        , _tbl
        , semantic_catalog_id
        , _col_diffs
        , _vec_nulls
        );
        raise debug '%', _sql;
        execute _sql;
        
        perform
        from pg_class k
        inner join pg_namespace n on (k.relnamespace = n.oid)
        inner join pg_trigger g on (g.tgrelid = k.oid)
        where n.nspname = 'ai'
        and k.relname = format('semantic_catalog_%s_%s', _tbl, semantic_catalog_id)
        and g.tgname = format('semantic_catalog_%s_%s_trig', _tbl, semantic_catalog_id)
        ;
        if not found then
            _sql = format(regexp_replace(
            $sql$
            create trigger semantic_catalog_%s_%s_trig 
            before update on ai.semantic_catalog_%s_%s
            for each row
            execute function ai.semantic_catalog_%s_%s_trig()
            $sql$, '^ {12}', '', 'gm') -- dedent 12 spaces
            , _tbl
            , semantic_catalog_id
            , _tbl
            , semantic_catalog_id
            , _tbl
            , semantic_catalog_id
            );
            raise debug '%', _sql;
            execute _sql;
        end if;
    end loop;
end
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_add_embedding
create or replace function ai.sc_add_embedding
( config jsonb
, embedding_name name default null
, catalog_name name default 'default'
) returns ai.semantic_catalog_embedding
as $func$
declare
    _config jsonb = sc_add_embedding.config;
    _embedding_name name = sc_add_embedding.embedding_name;
    _catalog_name name = sc_add_embedding.catalog_name;
    _catalog_id int4;
    _dims int4;
    _tbl text;
    _sql text;
    _embedding ai.semantic_catalog_embedding;
begin
    -- TODO: validate embedding config

    _dims = (_config->'dimensions')::int4;
    assert _dims is not null, 'embedding config is missing dimensions';
    
    -- grab the catalog id
    select c.id into strict _catalog_id
    from ai.semantic_catalog c
    where c.catalog_name = _catalog_name
    ;
    
    if _embedding_name is null then
        select 'emb' ||
        greatest
        ( count(*)::int4
        , max((regexp_match(e.embedding_name, '[0-9]+$'))[1]::int4)
        ) + 1
        into strict _embedding_name
        from ai.semantic_catalog_embedding e
        where e.semantic_catalog_id = _catalog_id
        ;
    end if;
    
    insert into ai.semantic_catalog_embedding (semantic_catalog_id, embedding_name, config)
    values (_catalog_id, _embedding_name, _config)
    returning * into strict _embedding
    ;
    
    -- add the columns
    foreach _tbl in array array['obj', 'sql', 'fact']
    loop
        _sql = format
        (
        $sql$
            alter table ai.semantic_catalog_%s_%s add column %s @extschema:vector@.vector(%s)
        $sql$
        , _tbl
        , _catalog_id
        , _embedding_name
        , _dims
        );
        raise debug '%', _sql;
        execute _sql;
    end loop;
    
    perform ai._semantic_catalog_make_triggers(_catalog_id);
    
    return _embedding;
end;
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_drop_embedding
create or replace function ai.sc_drop_embedding
( embedding_name name
, catalog_name name default 'default'
) returns void
as $func$
declare
    _embedding_name name = sc_drop_embedding.embedding_name;
    _catalog_name name = sc_drop_embedding.catalog_name;
    _embedding ai.semantic_catalog_embedding;
    _catalog_id int4;
    _tbl text;
    _sql text;
begin

    select c.id into strict _catalog_id
    from ai.semantic_catalog c
    where c.catalog_name = _catalog_name
    ;

    delete from ai.semantic_catalog_embedding e
    where e.semantic_catalog_id = _catalog_id
    and e.embedding_name = _embedding_name
    returning * into strict _embedding
    ;
    
    -- drop the columns
    foreach _tbl in array array['obj', 'sql', 'fact']
    loop
        _sql = format
        (
        $sql$
            alter table ai.semantic_catalog_%s_%s drop column %s
        $sql$
        , _tbl
        , _catalog_id
        , _embedding_name
        );
        raise debug '%', _sql;
        execute _sql;
    end loop;
    
    perform ai._semantic_catalog_make_triggers(_catalog_id);
end;
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- create_semantic_catalog
create or replace function ai.create_semantic_catalog
( catalog_name name default 'default'
, embedding_name name default null
, embedding_config jsonb default ai.embedding_sentence_transformers()
) returns int4
as $func$
declare
    _catalog_name name = create_semantic_catalog.catalog_name;
    _embedding_name name = create_semantic_catalog.embedding_name;
    _embedding_config jsonb = create_semantic_catalog.embedding_config;
    _catalog_id int4;
    _sql text;
begin
    select nextval('ai.semantic_catalog_id_seq')
    into strict _catalog_id
    ;

    insert into ai.semantic_catalog
    ( id
    , catalog_name
    , obj_table
    , sql_table
    , fact_table
    )
    values 
    ( _catalog_id
    , catalog_name
    , array['ai', format('semantic_catalog_obj_%s', _catalog_id)]
    , array['ai', format('semantic_catalog_sql_%s', _catalog_id)]
    , array['ai', format('semantic_catalog_fact_%s', _catalog_id)]
    )
    ;
    
    -- create the table for database objects
    _sql = format
    ( $sql$
        create table ai.semantic_catalog_obj_%s
        ( id int8 not null primary key generated by default as identity
        , classid oid not null
        , objid oid not null
        , objsubid int4 not null
        , objtype text not null
        , objnames text[] not null
        , objargs text[] not null
        , description text
        , usage int8 not null default 0
        , unique (classid, objid, objsubid) deferrable initially immediate
        , unique (objtype, objnames, objargs) deferrable initially immediate
        )
      $sql$
    , _catalog_id
    );
    raise debug '%', _sql;
    execute _sql;
    
    -- create the table for example sql
    _sql = format
    ( $sql$
        create table ai.semantic_catalog_sql_%s
        ( id int8 not null primary key generated by default as identity
        , sql text not null
        , description text not null
        , usage int8 not null default 0
        )
      $sql$
    , _catalog_id
    );
    raise debug '%', _sql;
    execute _sql;
    
    -- create the table for facts
    _sql = format
    ( $sql$
        create table ai.semantic_catalog_fact_%s
        ( id int8 not null primary key generated by default as identity
        , description text not null
        , usage int8 not null default 0
        )
      $sql$
    , _catalog_id
    );
    raise debug '%', _sql;
    execute _sql;
    
    perform ai.sc_add_embedding
    ( embedding_name=>_embedding_name
    , config=>_embedding_config
    , catalog_name=>_catalog_name
    );
    
    return _catalog_id;
end;
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- drop_semantic_catalog
create or replace function ai.drop_semantic_catalog(catalog_name name) returns int4
as $func$
declare
    _catalog_name name = drop_semantic_catalog.catalog_name;
    _catalog_id int4;
    _sql text;
    _tbl text;
begin
    delete from ai.semantic_catalog c
    where c.catalog_name = _catalog_name
    returning c.id into strict _catalog_id
    ;

    -- drop the table for database objects
    _sql = format
    ( $sql$
        drop table if exists ai.semantic_catalog_obj_%s
      $sql$
    , _catalog_id
    );
    raise debug '%', _sql;
    execute _sql;
    
    -- drop the table for example sql
    _sql = format
    ( $sql$
        drop table if exists ai.semantic_catalog_sql_%s
      $sql$
    , _catalog_id
    );
    raise debug '%', _sql;
    execute _sql;
    
    -- drop the table for facts
    _sql = format
    ( $sql$
        drop table if exists ai.semantic_catalog_fact_%s
      $sql$
    , _catalog_id
    );
    raise debug '%', _sql;
    execute _sql;
    
    -- drop trigger functions
    for _tbl in (values ('obj', 'sql', 'fact'))
    loop
        _sql = format
        ( $sql$
            drop function if exists ai.semantic_catalog_%s_%s_trig()
          $sql$
        , _tbl
        , _catalog_id
        );
        raise debug '%', _sql;
        execute _sql;
    end loop;
    
    return _catalog_id;
end
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_grant_read
create or replace function ai.sc_grant_read(catalog_name name, role_name name) returns void
as $func$
declare
    _catalog_name name = sc_grant_read.catalog_name;
    _role_name name = sc_grant_read.role_name;
    _catalog_id int;
    _sql text;
begin
    select x.id into strict _catalog_id
    from ai.semantic_catalog x
    where x.catalog_name = _catalog_name
    ;

    _sql = format($sql$grant usage on schema ai to %I$sql$, _role_name);
    raise debug '%', _sql;
    execute _sql;

    for _sql in
    (
        select format(x, _role_name)
        from unnest(array[
            $sql$grant select on ai.semantic_catalog to %I$sql$,
            $sql$grant select on ai.semantic_catalog_embedding to %I$sql$
        ]) x
    )
    loop
        raise debug '%', _sql;
        execute _sql;
    end loop;

    for _sql in
    (
        select format(y, x.id, _role_name)
        from ai.semantic_catalog x
        cross join unnest(array[
            $sql$grant select on ai.semantic_catalog_obj_%s to %I$sql$,
            $sql$grant select on ai.semantic_catalog_sql_%s to %I$sql$,
            $sql$grant select on ai.semantic_catalog_fact_%s to %I$sql$
        ]) y
        where x.catalog_name = _catalog_name
    )
    loop
        raise debug '%', _sql;
        execute _sql;
    end loop;
end
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_grant_write
create or replace function ai.sc_grant_write(catalog_name name, role_name name) returns void
as $func$
declare
    _catalog_name name = sc_grant_write.catalog_name;
    _role_name name = sc_grant_write.role_name;
    _catalog_id int;
    _sql text;
begin
    select x.id into strict _catalog_id
    from ai.semantic_catalog x
    where x.catalog_name = _catalog_name
    ;

    _sql = format($sql$grant usage on schema ai to %I$sql$, _role_name);
    raise debug '%', _sql;
    execute _sql;

    for _sql in
    (
        select format(x, _role_name)
        from unnest(array[
            $sql$grant select on ai.semantic_catalog to %I$sql$,
            $sql$grant select on ai.semantic_catalog_embedding to %I$sql$
        ]) x
    )
    loop
        raise debug '%', _sql;
        execute _sql;
    end loop;

    for _sql in
    (
        select format(y, x.id, _role_name)
        from ai.semantic_catalog x
        cross join unnest(array[
            $sql$grant select, insert, update, delete on ai.semantic_catalog_obj_%s to %I$sql$,
            $sql$grant usage, select, update on sequence ai.semantic_catalog_obj_%s_id_seq to %I$sql$,
            $sql$grant select, insert, update, delete on ai.semantic_catalog_sql_%s to %I$sql$,
            $sql$grant usage, select, update on sequence ai.semantic_catalog_sql_%s_id_seq to %I$sql$,
            $sql$grant select, insert, update, delete on ai.semantic_catalog_fact_%s to %I$sql$,
            $sql$grant usage, select, update on sequence ai.semantic_catalog_fact_%s_id_seq to %I$sql$
        ]) y
        where x.catalog_name = _catalog_name
    )
    loop
        raise debug '%', _sql;
        execute _sql;
    end loop;
end
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_grant_admin
create or replace function ai.sc_grant_admin(role_name name) returns void
as $func$
declare
    _role_name name = sc_grant_admin.role_name;
    _sql text;
begin

    _sql = format($sql$grant usage on schema ai to %I$sql$, _role_name);
    raise debug '%', _sql;
    execute _sql;

    for _sql in
    (
        select format(x, _role_name)
        from unnest(array[
            $sql$grant select, insert, update, delete, truncate on ai.semantic_catalog to %I$sql$,
            $sql$grant usage, select, update on sequence ai.semantic_catalog_id_seq to %I$sql$,
            $sql$grant select, insert, update, delete, truncate on ai.semantic_catalog_embedding to %I$sql$,
            $sql$grant usage, select, update on sequence ai.semantic_catalog_embedding_id_seq to %I$sql$
        ]) x
    )
    loop
        raise debug '%', _sql;
        execute _sql;
    end loop;

    for _sql in
    (
        select format(y, x.id, _role_name)
        from ai.semantic_catalog x
        cross join unnest(array[
            $sql$grant select, insert, update, delete on ai.semantic_catalog_obj_%s to %I$sql$,
            $sql$grant usage, select, update on sequence ai.semantic_catalog_obj_%s_id_seq to %I$sql$,
            $sql$grant select, insert, update, delete on ai.semantic_catalog_sql_%s to %I$sql$,
            $sql$grant usage, select, update on sequence ai.semantic_catalog_sql_%s_id_seq to %I$sql$,
            $sql$grant select, insert, update, delete on ai.semantic_catalog_fact_%s to %I$sql$,
            $sql$grant usage, select, update on sequence ai.semantic_catalog_fact_%s_id_seq to %I$sql$
        ]) y
    )
    loop
        raise debug '%', _sql;
        execute _sql;
    end loop;
end
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- _sc_obj
create or replace function ai._sc_obj(catalog_id int)
returns table
( id int8
, classid oid
, objid oid
, objsubid int4
, objtype text
, objnames text[]
, objargs text[]
, description text
)
as $func$
declare
    _sql text;
begin
    _sql = format
    ( $sql$
        select
          id
        , classid
        , objid
        , objsubid
        , objtype
        , objnames
        , objargs
        , description
        from ai.semantic_catalog_obj_%s
      $sql$
    , catalog_id
    );
    return query execute _sql;
end
$func$ language plpgsql stable security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_grant_obj_read
create or replace function ai.sc_grant_obj_read(catalog_name name, role_name name) returns void
as $func$
/*
    grants select/execute on all database objects referenced in the specified catalog
    grants usage on the schemas to which those objects belong
*/
declare
    _catalog_name name = sc_grant_obj_read.catalog_name;
    _role_name name = sc_grant_obj_read.role_name;
    _catalog_id int;
    _sql text;
begin
    select x.id into strict _catalog_id
    from ai.semantic_catalog x
    where x.catalog_name = _catalog_name
    ;
    
    if not has_table_privilege
        ( _role_name
        , format('ai.semantic_catalog_obj_%s', _catalog_id)
        , 'select'
        ) then
        raise exception 'user must have access to the catalog first';
    end if;

    -- schemas
    for _sql in
    (
        select format
        ( $sql$grant usage on schema %I to %I$sql$
        , x.schema_name
        , _role_name
        )
        from
        (
            select distinct x.objnames[1] as schema_name
            from ai._sc_obj(_catalog_id) x
            where x.objsubid = 0
        ) x
    )
    loop
        raise debug '%', _sql;
        execute _sql;
    end loop;

    -- objects
    for _sql in
    (
        select format
        ( $sql$grant %s on %s %I.%I%s to %I$sql$
        , case when x.objtype in ('aggregate', 'function', 'procedure')
            then 'execute'
            else 'select'
          end
        , case
            when x.objtype in ('function', 'aggregate') then 'function'
            else x.objtype
          end
        , x.objnames[1]
        , x.objnames[2]
        , case when x.objtype in ('aggregate', 'function', 'procedure')
            then format('(%s)', array_to_string(x.objargs, ', '))
            else ''
          end
        , _role_name
        )
        from ai._sc_obj(_catalog_id) x
        where x.objsubid = 0
        order by x.objnames
    )
    loop
        raise debug '%', _sql;
        execute _sql;
    end loop;
end
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_set_obj_desc
create or replace function ai.sc_set_obj_desc
( classid oid
, objid oid
, objsubid integer
, objtype text
, objnames text[]
, objargs text[]
, description text
, catalog_name name default 'default'
)
returns int8
as $func$
declare
    _catalog_name name = sc_set_obj_desc.catalog_name;
    _sql text;
    _id int8;
begin
    select format
    ( $sql$
        merge into ai.semantic_catalog_obj_%s tgt
        using
        (
            select
              $1 as classid
            , $2 as objid
            , $3 as objsubid
            , $4 as objtype
            , $5 as objnames
            , $6 as objargs
            , $7 as description
        ) src
        on (tgt.classid = src.classid and tgt.objid = src.objid and tgt.objsubid = src.objsubid)
        when matched then update set description = src.description
        when not matched by target then
        insert
        ( classid
        , objid
        , objsubid
        , objtype
        , objnames
        , objargs
        , description
        )
        values
        ( src.classid
        , src.objid
        , src.objsubid
        , src.objtype
        , src.objnames
        , src.objargs
        , src.description
        )
        returning id
      $sql$
    , x.id
    ) into strict _sql
    from ai.semantic_catalog x
    where x.catalog_name = _catalog_name
    ;
    execute _sql using
      classid
    , objid
    , objsubid
    , objtype
    , objnames
    , objargs
    , description
    into strict _id;
    return _id;
end
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_set_table_desc
create or replace function ai.sc_set_table_desc
( classid oid
, objid oid
, schema_name name
, table_name name
, description text
, catalog_name name default 'default'
)
returns int8
as $func$
    select *
    from ai.sc_set_obj_desc
    ( classid
    , objid
    , 0
    , 'table'
    , array[schema_name, table_name]
    , array[]::text[]
    , description
    , catalog_name
    );
$func$ language sql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_set_table_col_desc
create or replace function ai.sc_set_table_col_desc
( classid oid
, objid oid
, objsubid int4
, schema_name name
, table_name name
, column_name name
, description text
, catalog_name name default 'default'
)
returns int8
as $func$
    select *
    from ai.sc_set_obj_desc
    ( classid
    , objid
    , objsubid
    , 'table column'
    , array[schema_name, table_name, column_name]
    , array[]::text[]
    , description
    , catalog_name
    );
$func$ language sql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_set_view_desc
create or replace function ai.sc_set_view_desc
( classid oid
, objid oid
, schema_name name
, view_name name
, description text
, catalog_name name default 'default'
)
returns int8
as $func$
    select *
    from ai.sc_set_obj_desc
    ( classid
    , objid
    , 0
    , 'view'
    , array[schema_name, view_name]
    , array[]::text[]
    , description
    , catalog_name
    );
$func$ language sql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_set_view_col_desc
create or replace function ai.sc_set_view_col_desc
( classid oid
, objid oid
, objsubid int4
, schema_name name
, view_name name
, column_name name
, description text
, catalog_name name default 'default'
)
returns int8
as $func$
    select *
    from ai.sc_set_obj_desc
    ( classid
    , objid
    , objsubid
    , 'view column'
    , array[schema_name, view_name, column_name]
    , array[]::text[]
    , description
    , catalog_name
    );
$func$ language sql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_set_proc_desc
create or replace function ai.sc_set_proc_desc
( classid oid
, objid oid
, schema_name name
, proc_name name
, objargs text[]
, description text
, catalog_name name default 'default'
)
returns int8
as $func$
    select *
    from ai.sc_set_obj_desc
    ( classid
    , objid
    , 0
    , 'procedure'
    , array[schema_name, proc_name]
    , coalesce(objargs, array[]::text[])
    , description
    , catalog_name
    );
$func$ language sql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_set_func_desc
create or replace function ai.sc_set_func_desc
( classid oid
, objid oid
, schema_name name
, func_name name
, objargs text[]
, description text
, catalog_name name default 'default'
)
returns int8
as $func$
    select *
    from ai.sc_set_obj_desc
    ( classid
    , objid
    , 0
    , 'function'
    , array[schema_name, func_name]
    , coalesce(objargs, array[]::text[])
    , description
    , catalog_name
    );
$func$ language sql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_set_agg_desc
create or replace function ai.sc_set_agg_desc
( classid oid
, objid oid
, schema_name name
, agg_name name
, objargs text[]
, description text
, catalog_name name default 'default'
)
returns int8
as $func$
    select *
    from ai.sc_set_obj_desc
    ( classid
    , objid
    , 0
    , 'aggregate'
    , array[schema_name, agg_name]
    , coalesce(objargs, array[]::text[])
    , description
    , catalog_name
    );
$func$ language sql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_set_obj_desc
create or replace function ai.sc_set_obj_desc
( objtype text
, objnames text[]
, objargs text[]
, description text
, catalog_name name default 'default'
)
returns int8
as $func$
declare
    _classid oid;
    _objid oid;
    _objsubid integer;
begin
    select
      x.classid
    , x.objid
    , x.subobjid
    into strict
      _classid
    , _objid
    , _objsubid
    from pg_get_object_address(objtype, objnames, objargs) x
    ;
    return ai.sc_set_obj_desc
    ( _classid
    , _objid
    , _objsubid
    , objtype
    , objnames
    , objargs
    , description
    , catalog_name
    );
end
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_set_table_desc
create or replace function ai.sc_set_table_desc
( t regclass
, description text
, catalog_name name default 'default'
)
returns int8
as $func$
    select ai.sc_set_obj_desc
    ( 'pg_catalog.pg_class'::regclass::oid
    , t
    , 0
    , x.type
    , x.object_names
    , x.object_args
    , description
    , catalog_name
    )
    from pg_class k
    cross join pg_identify_object_as_address
    ( 'pg_catalog.pg_class'::regclass::oid
    , t
    , 0
    ) x
    where k.oid = t
    and k.relkind in ('r', 'p', 'f')
    ;
$func$ language sql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_set_table_col_desc
create or replace function ai.sc_set_table_col_desc
( t regclass
, column_name name
, description text
, catalog_name name default 'default'
)
returns int8
as $func$
    select ai.sc_set_obj_desc
    ( 'pg_catalog.pg_class'::regclass::oid
    , t
    , a.attnum
    , x.type
    , x.object_names
    , x.object_args
    , description
    , catalog_name
    )
    from pg_class k
    inner join pg_attribute a on (k.oid = a.attrelid)
    cross join lateral pg_identify_object_as_address
    ( 'pg_catalog.pg_class'::regclass::oid
    , t
    , a.attnum
    ) x
    where k.oid = t
    and k.relkind in ('r', 'p', 'f')
    and a.attname = column_name
    ;
$func$ language sql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_set_view_desc
create or replace function ai.sc_set_view_desc
( v regclass
, description text
, catalog_name name default 'default'
)
returns int8
as $func$
    select ai.sc_set_obj_desc
    ( 'pg_catalog.pg_class'::regclass::oid
    , v
    , 0
    , x.type
    , x.object_names
    , x.object_args
    , description
    , catalog_name
    )
    from pg_class k
    cross join pg_identify_object_as_address
    ( 'pg_catalog.pg_class'::regclass::oid
    , v
    , 0
    ) x
    where k.oid = v
    and k.relkind in ('v', 'm')
    ;
$func$ language sql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_set_view_col_desc
create or replace function ai.sc_set_view_col_desc
( v regclass
, column_name name
, description text
, catalog_name name default 'default'
)
returns int8
as $func$
    select ai.sc_set_obj_desc
    ( 'pg_catalog.pg_class'::regclass::oid
    , v
    , a.attnum
    , x.type
    , x.object_names
    , x.object_args
    , description
    , catalog_name
    )
    from pg_class k
    inner join pg_attribute a on (k.oid = a.attrelid)
    cross join lateral pg_identify_object_as_address
    ( 'pg_catalog.pg_class'::regclass::oid
    , v
    , a.attnum
    ) x
    where k.oid = v
    and k.relkind in ('v', 'm')
    and a.attname = column_name
    ;
$func$ language sql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_set_proc_desc
create or replace function ai.sc_set_proc_desc
( p regprocedure
, description text
, catalog_name name default 'default'
)
returns int8
as $func$
    select ai.sc_set_obj_desc
    ( 'pg_catalog.pg_proc'::regclass::oid
    , p
    , 0
    , x.type
    , x.object_names
    , x.object_args
    , description
    , catalog_name
    )
    from pg_proc o
    cross join pg_identify_object_as_address
    ( 'pg_catalog.pg_proc'::regclass::oid
    , p
    , 0
    ) x
    where o.oid = p
    and o.prokind = 'p'
    ;
$func$ language sql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_set_func_desc
create or replace function ai.sc_set_func_desc
( f regprocedure
, description text
, catalog_name name default 'default'
)
returns int8
as $func$
    select ai.sc_set_obj_desc
    ( 'pg_catalog.pg_proc'::regclass::oid
    , f
    , 0
    , x.type
    , x.object_names
    , x.object_args
    , description
    , catalog_name
    )
    from pg_proc o
    cross join pg_identify_object_as_address
    ( 'pg_catalog.pg_proc'::regclass::oid
    , f
    , 0
    ) x
    where o.oid = f
    and o.prokind in ('f', 'w')
    ;
$func$ language sql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_set_agg_desc
create or replace function ai.sc_set_agg_desc
( a regprocedure
, description text
, catalog_name name default 'default'
)
returns int8
as $func$
    select ai.sc_set_obj_desc
    ( 'pg_catalog.pg_proc'::regclass::oid
    , a
    , 0
    , x.type
    , x.object_names
    , x.object_args
    , description
    , catalog_name
    )
    from pg_proc o
    cross join pg_identify_object_as_address
    ( 'pg_catalog.pg_proc'::regclass::oid
    , a
    , 0
    ) x
    where o.oid = a
    and o.prokind = 'a'
    ;
$func$ language sql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_add_sql_desc
create or replace function ai.sc_add_sql_desc
( sql text
, description text
, catalog_name name default 'default'
)
returns int8
as $func$
declare
    _catalog_name name = sc_add_sql_desc.catalog_name;
    _sql text;
    _id int8;
begin
    select format
    ( $sql$
        insert into ai.semantic_catalog_sql_%s
        ( sql
        , description
        )
        values
        ( $1
        , $2
        )
        returning id
      $sql$
    , x.id
    ) into strict _sql
    from ai.semantic_catalog x
    where x.catalog_name = _catalog_name
    ;
    execute _sql using
      sql
    , description
    into strict _id;
    return _id;
end
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_update_sql_desc
create or replace function ai.sc_update_sql_desc
( id int8
, sql text
, description text
, catalog_name name default 'default'
)
returns void
as $func$
declare
    _catalog_name name = sc_update_sql_desc.catalog_name;
    _sql text;
begin
    select format
    ( $sql$
        update ai.semantic_catalog_sql_%s set
          sql = $1
        , description = $2
        where id = $3
      $sql$
    , x.id
    ) into strict _sql
    from ai.semantic_catalog x
    where x.catalog_name = _catalog_name
    ;
    execute _sql using
      sql
    , description
    , id
    ;
end
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_add_fact
create or replace function ai.sc_add_fact
( description text
, catalog_name name default 'default'
)
returns int8
as $func$
declare
    _catalog_name name = sc_add_fact.catalog_name;
    _sql text;
    _id int8;
begin
    select format
    ( $sql$
        insert into ai.semantic_catalog_fact_%s
        ( description
        )
        values
        ( $1
        )
        returning id
      $sql$
    , x.id
    ) into strict _sql
    from ai.semantic_catalog x
    where x.catalog_name = _catalog_name
    ;
    execute _sql using description
    into strict _id;
    return _id;
end
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

-------------------------------------------------------------------------------
-- sc_update_fact
create or replace function ai.sc_update_fact
( id int8
, description text
, catalog_name name default 'default'
)
returns void
as $func$
declare
    _catalog_name name = sc_update_fact.catalog_name;
    _sql text;
begin
    select format
    ( $sql$
        update ai.semantic_catalog_fact_%s set description = $1
        where id = $2
      $sql$
    , x.id
    ) into strict _sql
    from ai.semantic_catalog x
    where x.catalog_name = _catalog_name
    ;
    execute _sql using description, id;
end
$func$ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;



--------------------------------------------------------------------------------
-- 999-privileges.sql
create or replace function ai.grant_vectorizer_usage(to_user pg_catalog.name, admin pg_catalog.bool default false) returns void
as $func$
begin
    if not admin then
        execute 'grant usage, create on schema ai to ' || to_user;
        execute 'grant select, insert, update, delete on table ai.vectorizer to ' || to_user;
        execute 'grant select on ai._vectorizer_errors to ' || to_user;
        execute 'grant select on ai.vectorizer_errors to ' || to_user;
        execute 'grant select on ai.vectorizer_status to ' || to_user;
        execute 'grant select, usage on sequence ai.vectorizer_id_seq to ' || to_user;
    else
        execute 'grant all privileges on schema ai to ' || to_user;
        execute 'grant all privileges on table ai.pgai_lib_migration to ' || to_user;
        execute 'grant all privileges on table ai.pgai_lib_version to ' || to_user;
        execute 'grant all privileges on table ai.pgai_lib_feature_flag to ' || to_user;
        execute 'grant all privileges on table ai.vectorizer to ' || to_user;
        execute 'grant all privileges on table ai._vectorizer_errors to ' || to_user;
        execute 'grant all privileges on table ai.vectorizer_errors to ' || to_user;
        execute 'grant all privileges on table ai.vectorizer_status to ' || to_user;
        execute 'grant all privileges on sequence ai.vectorizer_id_seq to ' || to_user;
    end if;
end
$func$ language plpgsql volatile
security invoker -- gotta have privs to give privs
set search_path to pg_catalog, pg_temp
;

